You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/06 07:07:45 UTC

[GitHub] [spark] hiboyang commented on a change in pull request #30004: [SPARK-33114][CORE] Add metadata in MapStatus to support custom shuffle manager

hiboyang commented on a change in pull request #30004:
URL: https://github.com/apache/spark/pull/30004#discussion_r588842660



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -774,6 +783,18 @@ private[spark] class MapOutputTrackerMaster(
     }
   }
 
+  def getAllMapOutputStatuses(shuffleId: Int): Array[MapStatus] = {
+    logDebug(s"Fetching all output statuses for shuffle $shuffleId")
+    shuffleStatuses.get(shuffleId) match {
+      case Some(shuffleStatus) =>
+        shuffleStatus.withMapStatuses { statuses =>
+          MapOutputTracker.checkMapStatuses(statuses, shuffleId)
+          statuses.clone

Review comment:
       Yes, got your point. How about change this method to `getAllMapOutputStatusMetadata` to only return the metadada?
   
   
   

##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -171,6 +200,36 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     mapWorkerRpcEnv.shutdown()
   }
 
+  test("remote get all map output statuses with metadata") {

Review comment:
       The previous test has `masterTracker.unregisterMapOutput` and some test verification for that, thus want to avoid adding too much for that test. Also this test is specifically testing non-null metadata object, kind of following "separation of concerns" to make it as a separate test.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -213,6 +253,12 @@ private[spark] class HighlyCompressedMapStatus private (
       out.writeByte(kv._2)
     }
     out.writeLong(_mapTaskId)
+    if (_metadata.isEmpty) {

Review comment:
       Nice suggestion!

##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -139,11 +164,19 @@ private[spark] class CompressedMapStatus(
 
   override def mapId: Long = _mapTaskId
 
+  override def metadata: Option[Serializable] = _metadata
+
   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     loc.writeExternal(out)
     out.writeInt(compressedSizes.length)
     out.write(compressedSizes)
     out.writeLong(_mapTaskId)
+    if (_metadata.isEmpty) {

Review comment:
       nice suggestion!

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -827,6 +848,13 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getAllMapOutputStatuses(shuffleId: Int): Array[MapStatus] = {
+    logDebug(s"Fetching all output statuses for shuffle $shuffleId")
+    val statuses = getStatuses(shuffleId, conf)

Review comment:
       yes, we only need to clear `mapStatuses` in `MapOutputTrackerWorker` , will add that

##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -63,16 +63,37 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     assert(tracker.containsShuffle(10))
     val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
     val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
-    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
-        Array(1000L, 10000L), 5))
-    tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
-        Array(10000L, 1000L), 6))
+    val mapStatus1 = MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L, 10000L), 5)
+    val mapStatus2 = MapStatus(BlockManagerId("b", "hostB", 1000), Array(10000L, 1000L), 6)
+    tracker.registerMapOutput(10, 0, mapStatus1)
+    tracker.registerMapOutput(10, 1, mapStatus2)
     val statuses = tracker.getMapSizesByExecutorId(10, 0)
     assert(statuses.toSet ===
       Seq((BlockManagerId("a", "hostA", 1000),
         ArrayBuffer((ShuffleBlockId(10, 5, 0), size1000, 0))),
           (BlockManagerId("b", "hostB", 1000),
             ArrayBuffer((ShuffleBlockId(10, 6, 0), size10000, 1)))).toSet)
+    val allStatuses = tracker.getAllMapOutputStatuses(10)
+    assert(allStatuses.toSet === Set(mapStatus1, mapStatus2))

Review comment:
       good suggestion, will use Array to check the sequence as well

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1011,4 +1039,15 @@ private[spark] object MapOutputTracker extends Logging {
 
     splitsByAddress.mapValues(_.toSeq).iterator
   }
+
+  def checkMapStatuses(statuses: Array[MapStatus], shuffleId: Int): Unit = {
+    assert (statuses != null)
+    for (status <- statuses) {
+      if (status == null) {

Review comment:
       yes, good suggestion!

##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -327,6 +388,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
                 (ShuffleBlockId(10, 6, 2), size1000, 1)))
         )
     )
+    assert(tracker.getAllMapOutputStatuses(10).toSet === Set(mapStatus1, mapStatus2))

Review comment:
       good catch!

##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -147,13 +171,18 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     intercept[FetchFailedException] { mapWorkerTracker.getMapSizesByExecutorId(10, 0) }
 
     val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
-    masterTracker.registerMapOutput(10, 0, MapStatus(
-      BlockManagerId("a", "hostA", 1000), Array(1000L), 5))
+    val mapStatus = MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L), 5)
+    masterTracker.registerMapOutput(10, 0, mapStatus)
     mapWorkerTracker.updateEpoch(masterTracker.getEpoch)
     assert(mapWorkerTracker.getMapSizesByExecutorId(10, 0).toSeq ===
       Seq((BlockManagerId("a", "hostA", 1000),
         ArrayBuffer((ShuffleBlockId(10, 5, 0), size1000, 0)))))
-    assert(0 == masterTracker.getNumCachedSerializedBroadcast)
+    val allMapOutputStatuses = mapWorkerTracker.getAllMapOutputStatuses(10)
+    assert(allMapOutputStatuses.length === 1)
+    assert(allMapOutputStatuses(0).location === mapStatus.location)

Review comment:
       In responding of one of previous comments, I am suggesting returning only metadata instead of the whole map statue object. Will revisit here after that discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org