You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/09/05 06:20:54 UTC
[spark] branch master updated: [SPARK-28976][CORE] Use KeyLock to
simplify MapOutputTracker.getStatuses
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 84a4d3a [SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses
84a4d3a is described below
commit 84a4d3a17ccbf7e0cb75dffbbdc20a26715f7323
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Wed Sep 4 23:20:27 2019 -0700
[SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses
### What changes were proposed in this pull request?
Use `KeyLock` added in #25612 to simplify `MapOutputTracker.getStatuses`. It also has some improvement after the refactoring:
- `InterruptedException` is no longer sallowed.
- When a shuffle block is fetched, we don't need to wake up unrelated sleeping threads.
### Why are the changes needed?
`MapOutputTracker.getStatuses` is pretty hard to maintain right now because it has a special lock mechanism which we needs to pay attention to whenever updating this method. As we can use `KeyLock` to hide the complexity of locking behind a dedicated lock class, it's better to refactor it to make it easy to understand and maintain.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #25680 from zsxwing/getStatuses.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../scala/org/apache/spark/MapOutputTracker.scala | 50 +++++-----------------
1 file changed, 10 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 5c820f5..d878fc5 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -678,8 +678,11 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
- /** Remembers which map output locations are currently being fetched on an executor. */
- private val fetching = new HashSet[Int]
+ /**
+ * A [[KeyLock]] whose key is a shuffle id to ensure there is only one thread fetching
+ * the same shuffle block.
+ */
+ private val fetchingLock = new KeyLock[Int]
// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
@@ -707,51 +710,18 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTimeNs = System.nanoTime()
- var fetchedStatuses: Array[MapStatus] = null
- fetching.synchronized {
- // Someone else is fetching it; wait for them to be done
- while (fetching.contains(shuffleId)) {
- try {
- fetching.wait()
- } catch {
- case e: InterruptedException =>
- }
- }
-
- // Either while we waited the fetch happened successfully, or
- // someone fetched it in between the get and the fetching.synchronized.
- fetchedStatuses = mapStatuses.get(shuffleId).orNull
+ fetchingLock.withLock(shuffleId) {
+ var fetchedStatuses = mapStatuses.get(shuffleId).orNull
if (fetchedStatuses == null) {
- // We have to do the fetch, get others to wait for us.
- fetching += shuffleId
- }
- }
-
- if (fetchedStatuses == null) {
- // We won the race to fetch the statuses; do so
- logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
- // This try-finally prevents hangs due to timeouts:
- try {
+ logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
- } finally {
- fetching.synchronized {
- fetching -= shuffleId
- fetching.notifyAll()
- }
}
- }
- logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
- s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
-
- if (fetchedStatuses != null) {
+ logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+ s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
fetchedStatuses
- } else {
- logError("Missing all output locations for shuffle " + shuffleId)
- throw new MetadataFetchFailedException(
- shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org