You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ConeyLiu <gi...@git.apache.org> on 2018/09/09 11:32:41 UTC

[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

GitHub user ConeyLiu opened a pull request:

    https://github.com/apache/spark/pull/22371

    [SPARK-25386][CORE] Don't need to synchronize the IndexShuffleBlockResolver for each writeIndexFileAndCommit

    
    ## What changes were proposed in this pull request?
    
    Now, we need synchronize the instance of IndexShuffleBlockResolver in order to make the commit check and tmp file rename atomically. This can be improved. We could synchronize a lock which is different for each `shuffleId + mapId` instead of  synchronize the indexShuffleBlockResolver for each writeIndexFileAndCommit.
    
    This should be an optimization with space for time, but it doesn't take up a lot of space.
    
    ## How was this patch tested?
    
    Existing UT.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ConeyLiu/spark indexShuffleBlockResolver

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22371.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22371
    
----
commit 92c2e07171f60b977c62661ea6475486a1599b19
Author: Xianyang Liu <xi...@...>
Date:   2018-09-09T10:44:23Z

    don't need synchronized the IndexShuffleBlockResolver for each writeIndexFileAndCommit

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216369925
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -138,13 +154,22 @@ private[spark] class IndexShuffleBlockResolver(
           mapId: Int,
           lengths: Array[Long],
           dataTmp: File): Unit = {
    +    val mapLocks = shuffleIdToLocks.get(shuffleId)
    +    require(mapLocks != null, "Shuffle should be registered to IndexShuffleBlockResolver first")
    +    val lock = mapLocks.synchronized {
    --- End diff --
    
    in most cases, you'll still have contention on this lock, right, its just a much smaller critical section?  As the usual case is for multiple tasks in the same executor to be for the same stage.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216166979
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -51,6 +52,8 @@ private[spark] class IndexShuffleBlockResolver(
     
       private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
     
    +  private val shuffleIdToLocks = new ConcurrentHashMap[Int, Array[Object]]()
    --- End diff --
    
    How about `Array[_]` if the type is truly irrelevant? But I get that it's always going to be an Object given the usage here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    OK, thanks everyone for the help. Close it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    Thanks @felixcheung, @srowen, @cloud-fan for your time. There is only one instance of `IndexShuffleBlockResolver` per executor, and the synchronize is used to protect the modify safely when there are same tasks with different attempt update at the same time. The synchronize is unnecessary for most of the tasks, and the modify is very simple.
    
    I have tested locally, the results as follow. I admit that this change brings little improvement to complex tasks, but it does not cause performance degradation.
    
    `./spark-shell --master local[20] --driver-memory 40g`
    `spark.range(0, 10000000, 1, 100).repartition(200).count()`
    
    before: 
    
    map | reduce
    ---- | ---
    2s | 0.4s
    0.8s |  0.2s
    0.7s |  0.2s
    
    after:
    
    map | reduce
    ---- | ---
    0.8s | 0.2s
    0.5s |  0.4s
    0.5s |  0.2s


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    + @srowen @squito @JoshRosen 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    My opinion is, it's not worth to spend time on it. The lock is not likely to be a bottleneck and it's better to keep it simple even it's sub-optimal.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    How much perf can we save here? I don't think shuffle writing will be bottlenecked by this lock.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu closed the pull request at:

    https://github.com/apache/spark/pull/22371


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216167054
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -138,13 +148,22 @@ private[spark] class IndexShuffleBlockResolver(
           mapId: Int,
           lengths: Array[Long],
           dataTmp: File): Unit = {
    +    shuffleIdToLocks.putIfAbsent(shuffleId, new Array[Object](lengths.length))
    --- End diff --
    
    Rather than put, then get, just `shuffleIdToLocks.computeIfAbsent(shuffleId, () => new Array[Object](lengths.length))`. Something like that should work and maybe avoid even creating the array unless it is needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    @squito , thanks for the review. I intend to using `ConcurrentHashMap[Int, AtomicReferenceArray]` previously.
    
    After re-think the code, I can know the lock here is used to prevent the same task with different attempt to commit the shuffle writer result at the same time. The task has a different attempt can be caused by follows:
    
    1. Failed task or stage. In this case, the previous task attempt should already finish(failed or killed) or the result is not used anymore.
    
    2. `Speculative task`. In this case, the speculative task can't be scheduled to the same executor as other attempts.
    
    So, what's real value for the lock. Maybe I'm wrong, hopeful some answers.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216383808
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -138,13 +154,22 @@ private[spark] class IndexShuffleBlockResolver(
           mapId: Int,
           lengths: Array[Long],
           dataTmp: File): Unit = {
    +    val mapLocks = shuffleIdToLocks.get(shuffleId)
    +    require(mapLocks != null, "Shuffle should be registered to IndexShuffleBlockResolver first")
    +    val lock = mapLocks.synchronized {
    --- End diff --
    
    The theory is many fewer threads would contend here because it's per-shuffleID.
    
    If it's an issue, then your idea of a second-level ConcurrentHashMap might help. It's more complex than a usual Map but can allow for safe concurrent access by a limited number of threads.
    
    Otherwise it might be overkill as the second-level Map.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216167004
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -76,6 +79,13 @@ private[spark] class IndexShuffleBlockResolver(
             logWarning(s"Error deleting index ${file.getPath()}")
           }
         }
    +
    +    // This should be called when we unregister shuffle from ShuffleManager, so it's safe to set
    +    // null for given shuffleId.
    +    val locks = shuffleIdToLocks.get(shuffleId)
    --- End diff --
    
    Just `shuffleIdLocks.remove(shuffleId)` ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    @cloud-fan @jiangxb1987 Could you help to review this? Thanks a lot.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216389817
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -138,13 +154,22 @@ private[spark] class IndexShuffleBlockResolver(
           mapId: Int,
           lengths: Array[Long],
           dataTmp: File): Unit = {
    +    val mapLocks = shuffleIdToLocks.get(shuffleId)
    +    require(mapLocks != null, "Shuffle should be registered to IndexShuffleBlockResolver first")
    +    val lock = mapLocks.synchronized {
    --- End diff --
    
    Oh I see, then your proposal could actually resolve the contention. With ConcurrentHashMap this synchronized block can be an updateIfAbsent call that might not even contend.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216167356
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -138,13 +148,22 @@ private[spark] class IndexShuffleBlockResolver(
           mapId: Int,
           lengths: Array[Long],
           dataTmp: File): Unit = {
    +    shuffleIdToLocks.putIfAbsent(shuffleId, new Array[Object](lengths.length))
    +    val mapLocks = shuffleIdToLocks.get(shuffleId)
    +    val lock = mapLocks.synchronized {
    +      if (mapLocks(mapId) == null) {
    +        mapLocks(mapId) = new Object()
    +      }
    +      mapLocks(mapId)
    +    }
    +
         val indexFile = getIndexFile(shuffleId, mapId)
         val indexTmp = Utils.tempFileWith(indexFile)
         try {
           val dataFile = getDataFile(shuffleId, mapId)
    -      // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
    -      // the following check and rename are atomic.
    -      synchronized {
    +      // We need make sure the following check and rename are atomic, and we only need to
    --- End diff --
    
    I don't know the logic well enough to really evaluate this, but it looks plausible. It looks like this block operates on `indexFile` and `dataFile` and things derived from them, and those appear to be keyed by `shuffleId` and `mapId`, so sounds plausible that there is no need to synchronize access when different shuffle or map IDs are used.
    
    I see the synchronized block does things like delete `indexFile`. This is read outside the synchronized block. I wonder if there is an issue there? should this really be checking for the file inside a block that excludes deletion of that file at the same time?
    
    Granted that is the existing logic here and maybe OK to not touch that now, but I did wonder when trying to reason about this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22371: [SPARK-25386][CORE] Don't need to synchronize the IndexS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22371
  
    @ConeyLiu we may have an executor lost and then come back, and may have 2 same tasks running on the same executor.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by ConeyLiu <gi...@git.apache.org>.
Github user ConeyLiu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216304910
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -51,6 +52,8 @@ private[spark] class IndexShuffleBlockResolver(
     
       private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
     
    +  private val shuffleIdToLocks = new ConcurrentHashMap[Int, Array[Object]]()
    --- End diff --
    
    Seems `Object` can't put into `Array[_]` directly, maybe need some casts. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22371: [SPARK-25386][CORE] Don't need to synchronize the...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22371#discussion_r216387272
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -138,13 +154,22 @@ private[spark] class IndexShuffleBlockResolver(
           mapId: Int,
           lengths: Array[Long],
           dataTmp: File): Unit = {
    +    val mapLocks = shuffleIdToLocks.get(shuffleId)
    +    require(mapLocks != null, "Shuffle should be registered to IndexShuffleBlockResolver first")
    +    val lock = mapLocks.synchronized {
    --- End diff --
    
    in the usual case, multiple threads are still sharing the same shuffleID, they're just writing to different map tasks.  (they are in the simple example job @ConeyLiu  gave, of `spark.range(0, 10000000, 1, 100).repartition(200).count()`)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org