You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/09/05 06:20:54 UTC

[spark] branch master updated: [SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 84a4d3a  [SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses
84a4d3a is described below

commit 84a4d3a17ccbf7e0cb75dffbbdc20a26715f7323
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Wed Sep 4 23:20:27 2019 -0700

    [SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses
    
    ### What changes were proposed in this pull request?
    
    Use `KeyLock` added in #25612 to simplify `MapOutputTracker.getStatuses`. It also has some improvement after the refactoring:
    - `InterruptedException` is no longer sallowed.
    - When a shuffle block is fetched, we don't need to wake up unrelated sleeping threads.
    
    ### Why are the changes needed?
    
    `MapOutputTracker.getStatuses` is pretty hard to maintain right now because it has a special lock mechanism which we needs to pay attention to whenever updating this method. As we can use `KeyLock` to hide the complexity of locking behind a dedicated lock class, it's better to refactor it to make it easy to understand and maintain.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #25680 from zsxwing/getStatuses.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../scala/org/apache/spark/MapOutputTracker.scala  | 50 +++++-----------------
 1 file changed, 10 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 5c820f5..d878fc5 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -678,8 +678,11 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   val mapStatuses: Map[Int, Array[MapStatus]] =
     new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
 
-  /** Remembers which map output locations are currently being fetched on an executor. */
-  private val fetching = new HashSet[Int]
+  /**
+   * A [[KeyLock]] whose key is a shuffle id to ensure there is only one thread fetching
+   * the same shuffle block.
+   */
+  private val fetchingLock = new KeyLock[Int]
 
   // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
   override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
@@ -707,51 +710,18 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     if (statuses == null) {
       logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
       val startTimeNs = System.nanoTime()
-      var fetchedStatuses: Array[MapStatus] = null
-      fetching.synchronized {
-        // Someone else is fetching it; wait for them to be done
-        while (fetching.contains(shuffleId)) {
-          try {
-            fetching.wait()
-          } catch {
-            case e: InterruptedException =>
-          }
-        }
-
-        // Either while we waited the fetch happened successfully, or
-        // someone fetched it in between the get and the fetching.synchronized.
-        fetchedStatuses = mapStatuses.get(shuffleId).orNull
+      fetchingLock.withLock(shuffleId) {
+        var fetchedStatuses = mapStatuses.get(shuffleId).orNull
         if (fetchedStatuses == null) {
-          // We have to do the fetch, get others to wait for us.
-          fetching += shuffleId
-        }
-      }
-
-      if (fetchedStatuses == null) {
-        // We won the race to fetch the statuses; do so
-        logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
-        // This try-finally prevents hangs due to timeouts:
-        try {
+          logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
           val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
           fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
           logInfo("Got the output locations")
           mapStatuses.put(shuffleId, fetchedStatuses)
-        } finally {
-          fetching.synchronized {
-            fetching -= shuffleId
-            fetching.notifyAll()
-          }
         }
-      }
-      logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
-        s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
-
-      if (fetchedStatuses != null) {
+        logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+          s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
         fetchedStatuses
-      } else {
-        logError("Missing all output locations for shuffle " + shuffleId)
-        throw new MetadataFetchFailedException(
-          shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
       }
     } else {
       statuses


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org