You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/25 17:46:55 UTC

[GitHub] [spark] Victsm commented on a change in pull request #30480: [SPARK-32921][SHUFFLE][test-maven][test-hadoop2.7] MapOutputTracker extensions to support push-based shuffle

Victsm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r530549266



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1011,4 +1333,47 @@ private[spark] object MapOutputTracker extends Logging {
 
     splitsByAddress.mapValues(_.toSeq).iterator
   }
+
+  /**
+   * Given a shuffle ID, a partition ID, an array of map statuses, and bitmap corresponding
+   * to either a merged shuffle partition or a merged shuffle partition chunk, identify
+   * the metadata about the shuffle partition blocks that are merged into the merged shuffle
+   * partition or partition chunk represented by the bitmap.
+   *
+   * @param shuffleId Identifier for the shuffle
+   * @param partitionId The partition ID of the MergeStatus for which we look for the metadata
+   *                    of the merged shuffle partition blocks
+   * @param mapStatuses List of map statuses, indexed by map ID
+   * @param tracker     bitmap containing mapIndexes that belong to the merged block or merged
+   *                    block chunk.
+   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
+   *         and the second item is a sequence of (shuffle block ID, shuffle block size) tuples
+   *         describing the shuffle blocks that are stored at that block manager.
+   */
+  def getMapStatusesForMergeStatus(
+      shuffleId: Int,
+      partitionId: Int,
+      mapStatuses: Array[MapStatus],
+      tracker: RoaringBitmap): Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    assert (mapStatuses != null && tracker != null)
+    val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
+    for ((status, mapIndex) <- mapStatuses.zipWithIndex) {
+      // Only add blocks that are merged
+      if (tracker.contains(mapIndex)) {
+        MapOutputTracker.validateStatus(status, shuffleId, partitionId)
+        splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
+          ((ShuffleBlockId(shuffleId, status.mapId, partitionId),
+            status.getSizeForBlock(partitionId), mapIndex))
+      }
+    }
+    splitsByAddress.toSeq

Review comment:
       @dongjoon-hyun the Scala 2.13 compatibility issue should be fixed now. The build is still failing with javadoc generation though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org