You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/26 14:59:30 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #30480: [SPARK-32921][SHUFFLE] MapOutputTracker extensions to support push-based shuffle

Ngone51 commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r602266320



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -833,33 +1129,44 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
    *
    * (It would be nice to remove this restriction in the future.)
    */
-  private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = {
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
+  private def getStatuses(
+      shuffleId: Int, conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) = {
+    val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
+    val mergeResultStatuses = mergeStatuses.get(shuffleId).orNull
+    if (mapOutputStatuses == null || (fetchMergeResult && mergeResultStatuses == null)) {
+      logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", fetching them")

Review comment:
       nit:
   ```suggestion
         logInfo(s"Don't have map/merge outputs for shuffle $shuffleId, fetching them")
   ```

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -833,33 +1129,44 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
    *
    * (It would be nice to remove this restriction in the future.)
    */
-  private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = {
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
+  private def getStatuses(
+      shuffleId: Int, conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) = {
+    val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
+    val mergeResultStatuses = mergeStatuses.get(shuffleId).orNull
+    if (mapOutputStatuses == null || (fetchMergeResult && mergeResultStatuses == null)) {
+      logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", fetching them")
       val startTimeNs = System.nanoTime()
       fetchingLock.withLock(shuffleId) {
-        var fetchedStatuses = mapStatuses.get(shuffleId).orNull
-        if (fetchedStatuses == null) {
-          logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+        var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull
+        if (fetchedMapStatuses == null) {
+          logInfo("Doing the map fetch; tracker endpoint = " + trackerEndpoint)
           val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
-          fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
-          logInfo("Got the output locations")
-          mapStatuses.put(shuffleId, fetchedStatuses)
+          fetchedMapStatuses = MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the map output locations")
+          mapStatuses.put(shuffleId, fetchedMapStatuses)
+        }
+        var fetchedMergeStatues = mergeStatuses.get(shuffleId).orNull
+        if (fetchMergeResult && fetchedMergeStatues == null) {
+          logInfo("Doing the merge fetch; tracker endpoint = " + trackerEndpoint)
+          val fetchedBytes = askTracker[Array[Byte]](GetMergeResultStatuses(shuffleId))
+          fetchedMergeStatues = MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the merge output locations")
+          mergeStatuses.put(shuffleId, fetchedMergeStatues)
         }
-        logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+        logDebug(s"Fetching map/merge output statuses for shuffle $shuffleId took " +

Review comment:
       nit:
   ```suggestion
           logDebug(s"Fetching map ${if (fetchMergeResult) "/merge"} output statuses for shuffle $shuffleId took " +
   ```

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -86,36 +90,62 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
   // Exposed for testing
   val mapStatuses = new Array[MapStatus](numPartitions)
 
+  /**
+   * MergeStatus for each shuffle partition when push-based shuffle is enabled. The index of the
+   * array is the shuffle partition id (reduce id). Each value in the array is the MergeStatus for
+   * a shuffle partition, or null if not available. When push-based shuffle is enabled, this array
+   * provides a reducer oriented view of the shuffle status specifically for the results of
+   * merging shuffle partition blocks into per-partition merged shuffle files.

Review comment:
       "shuffle partition" is a bit confusing here. The original mapstatus also uses the word `partition`. Maybe, change all "shuffle partition" to "shuffle reduce partition"?

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -833,33 +1106,44 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
    *
    * (It would be nice to remove this restriction in the future.)
    */
-  private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = {
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
+  private def getStatuses(
+      shuffleId: Int, conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) = {
+    val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
+    val mergeResultStatuses = mergeStatuses.get(shuffleId).orNull
+    if (mapOutputStatuses == null || (fetchMergeResult && mergeResultStatuses == null)) {
+      logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", fetching them")
       val startTimeNs = System.nanoTime()
       fetchingLock.withLock(shuffleId) {
-        var fetchedStatuses = mapStatuses.get(shuffleId).orNull
-        if (fetchedStatuses == null) {
-          logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+        var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull
+        if (fetchedMapStatuses == null) {
+          logInfo("Doing the map fetch; tracker endpoint = " + trackerEndpoint)
           val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
-          fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
-          logInfo("Got the output locations")
-          mapStatuses.put(shuffleId, fetchedStatuses)
+          fetchedMapStatuses = MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the map output locations")
+          mapStatuses.put(shuffleId, fetchedMapStatuses)
         }
-        logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+        var fetchedMergeStatues = mergeStatuses.get(shuffleId).orNull
+        if (fetchMergeResult && fetchedMergeStatues == null) {
+          logInfo("Doing the merge fetch; tracker endpoint = " + trackerEndpoint)
+          val fetchedBytes = askTracker[Array[Byte]](GetMergeResultStatuses(shuffleId))
+          fetchedMergeStatues = MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the merge output locations")
+          mergeStatuses.put(shuffleId, fetchedMergeStatues)
+        }
+        logDebug(s"Fetching map/merge output statuses for shuffle $shuffleId took " +
           s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
-        fetchedStatuses
+        (fetchedMapStatuses, fetchedMergeStatues)
       }

Review comment:
       I'd prefer to combine them. Actually, the first time when I reviewed this PR, I began to think about a unified way to provide a consistent API for both map status and merged status in `MapOutputTracker` & `ShuffleStatus`. Unfortunately, I didn't get a good idea.
   
   I think one RPC would ease the error handling for us. Not sure how much complexity you'd expect?
   
   And I'd suggest adding an additional new RPC for the combined case and leave the current one as it is, so that we don't affect the existing code path when push-based shuffle disabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org