You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/28 23:31:34 UTC

[GitHub] [spark] venkata91 commented on a change in pull request #30480: [SPARK-32921][SHUFFLE] MapOutputTracker extensions to support push-based shuffle

venkata91 commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r602949966



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -449,21 +605,32 @@ private[spark] class MapOutputTrackerMaster(
       try {
         while (true) {
           try {
-            val data = mapOutputRequests.take()
-             if (data == PoisonPill) {
+            val data = mapOutputTrackerMasterMessages.take()
+            if (data == PoisonPill) {
               // Put PoisonPill back so that other MessageLoops can see it.
-              mapOutputRequests.offer(PoisonPill)
+              mapOutputTrackerMasterMessages.offer(PoisonPill)
               return
             }
-            val context = data.context
-            val shuffleId = data.shuffleId
-            val hostPort = context.senderAddress.hostPort
-            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
-              " to " + hostPort)
-            val shuffleStatus = shuffleStatuses.get(shuffleId).head
-            context.reply(
-              shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast,
-                conf))
+
+            data match {
+              case GetMapStatusMessage(shuffleId, context) =>
+                val hostPort = context.senderAddress.hostPort
+                val shuffleStatus = shuffleStatuses.get(shuffleId).head
+                logDebug("Handling request to send map output locations for shuffle " + shuffleId +
+                  " to " + hostPort)
+                context.reply(
+                  shuffleStatus.serializedOutputStatus(broadcastManager, isLocal,
+                    minSizeForBroadcast, conf, isMapOutput = true))
+
+              case GetMergeStatusMessage(shuffleId, context) =>
+                val hostPort = context.senderAddress.hostPort
+                val shuffleStatus = shuffleStatuses.get(shuffleId).head
+                logDebug("Handling request to send merge output locations for" +
+                  " shuffle " + shuffleId + " to " + hostPort)
+                context.reply(
+                  shuffleStatus.serializedOutputStatus(broadcastManager, isLocal,
+                    minSizeForBroadcast, conf, isMapOutput = false))
+            }

Review comment:
       As part of we would be adding one more message for getting shuffle mergers https://issues.apache.org/jira/browse/SPARK-34826 which is why made this change. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org