You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/29 09:23:54 UTC

[GitHub] [spark] juliuszsompolski commented on a change in pull request #25612: [SPARK-3137][Core]Replace the global TorrentBroadcast lock with fine grained locks

juliuszsompolski commented on a change in pull request #25612: [SPARK-3137][Core]Replace the global TorrentBroadcast lock with fine grained locks
URL: https://github.com/apache/spark/pull/25612#discussion_r318971022
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
 ##########
 @@ -290,6 +293,42 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
 
 private object TorrentBroadcast extends Logging {
 
+  /** Locks to ensure there is only one thread fetching the same [[TorrentBroadcast]] block. */
+  private val torrentBroadcastLock = new ConcurrentHashMap[BroadcastBlockId, AnyRef]()
+
+  /** Acquire a lock for fetching a [[TorrentBroadcast]] block. */
+  private def acquireTorrentBroadcastLock(broadcastId: BroadcastBlockId): Unit = {
+    while (true) {
+      val lock = torrentBroadcastLock.putIfAbsent(broadcastId, new Object)
+      if (lock == null) return
+      lock.synchronized {
+        while (torrentBroadcastLock.get(broadcastId) eq lock) {
+          lock.wait()
 
 Review comment:
   Why not just stripe over a fixed number of Reentrant locks? This should reduce contention enough (and potential collissions can be controlled with numLocks) is easy to understand and  doesn't reimplement its own lock queues. It's also fully interruptible, while in the current implementation the threads that are queueing for the lock are not interruptible. 
   ```
   private val numLocks = 100
   private val locks = (1 to numLocks).map(_ => new java.util.concurrent.locks.ReentrantLock)
   
   private def withTorrentBroadcastLock[T](broadcastId: BroadcastBlockId)(func: => T): T = {
     val lock = locks(broadcastId.broadcastId % numLocks)
     lock.lockInterruptibly()
     try {
       func
     } finally {
       lock.unlock()
     }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org