You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "JoshRosen (via GitHub)" <gi...@apache.org> on 2023/04/17 23:43:48 UTC

[GitHub] [spark] JoshRosen commented on a diff in pull request #40690: [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput

JoshRosen commented on code in PR #40690:
URL: https://github.com/apache/spark/pull/40690#discussion_r1169356722


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -157,22 +164,29 @@ private class ShuffleStatus(
       invalidateSerializedMapOutputStatusCache()
     }
     mapStatuses(mapIndex) = status
+    mapIdToMapIndex(status.mapId) = mapIndex
   }
 
   /**
    * Update the map output location (e.g. during migration).
    */
   def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock {
     try {
-      val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId)
+      // OpenHashMap would return 0 if the key doesn't exist.
+      val mapIndex = if (mapIdToMapIndex.contains(mapId)) {
+        Some(mapIdToMapIndex(mapId))
+      } else {
+        None
+      }
+      val mapStatusOpt = mapIndex.map(mapStatuses(_))
       mapStatusOpt match {
         case Some(mapStatus) =>
           logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
           mapStatus.updateLocation(bmAddress)
           invalidateSerializedMapOutputStatusCache()
         case None =>
-          val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId == mapId)
-          if (index >= 0 && mapStatuses(index) == null) {
+          if (mapIndex.nonEmpty && mapStatuses(mapIndex.get) == null) {
+            val index = mapIndex.get

Review Comment:
   I'm curious about whether there's a corner-case here that we might need to handle:
   
   - Partition 0 is computed by mapId = TID0
   - That map output is lost, so mapStatusesDeleted(0).mapStatus == TID0
   - Partition 0 is recomputed by mapId = TID1
   - That recomputed map output is also lost, causing us to overwrite such that mapStatusesDeleted(0) == TID1
   - The _original_ map output is recovered, so we invoke this `updateMapOutput` path with `mapId = TID0`.
   - Our index mapping returns index 0, so we use mapStatusesDeleted(0) == TID1, which is the map status from the _second_ attempt (also lost) even though we have recovered the data from the first attempt.
   
   This is not a problem as long as the map outputs are identical across attempts. If those outputs _weren't_ identical, though, then we could run into correctness problems from the mixup of map statuses. However, I think that would only be an issue if it impacted the set of fetches that are performed, e.g. in one case a reducer would have fetched output from a mapper, but in another case it doesn't because the two attempts differ in terms of whether they send output to a particular reducer. But I think that case can only occur in case of other non-deterministic recomputation correctness issues.
   
   To avoid having to worry about that edge-case, WDYT about doing something like
   
   ```suggestion
             if (mapIndex.nonEmpty
                 && mapStatuses(mapIndex.get) == null
                 && mapStatusesDeleted(mapIndex.get).mapId == mapId) {
               val index = mapIndex.get
   ```
   
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org