You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jerqi (via GitHub)" <gi...@apache.org> on 2023/03/08 13:12:02 UTC

[GitHub] [spark] jerqi opened a new pull request, #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality`.enabled`

jerqi opened a new pull request, #40339:
URL: https://github.com/apache/spark/pull/40339

   ### What changes were proposed in this pull request?
   `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality`.enabled`
   
   ### Why are the changes needed?
   
   Discuss as https://github.com/apache/spark/pull/40307
   
   getPreferredLocations in ShuffledRowRDD should return Nil at the very beginning in case spark.shuffle.reduceLocality.enabled = false (conceptually).
   
   This logic is pushed into MapOutputTracker though - and getPreferredLocationsForShuffle honors spark.shuffle.reduceLocality.enabled - but getMapLocation does not.
   
   So the fix would be to fix getMapLocation to honor the parameter.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   New ut


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40339:
URL: https://github.com/apache/spark/pull/40339#issuecomment-1462498084

   Merged to master.
   Thanks for working on this @jerqi !
   Thanks for the reviews @cloud-fan, @LuciferYang, @advancedxy :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerqi commented on pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #40339:
URL: https://github.com/apache/spark/pull/40339#issuecomment-1461353825

   The test failure is unrelated, I retrigger the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130589059


##########
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala:
##########
@@ -1030,4 +1030,21 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     rpcEnv.shutdown()
     assert(npeCounter.intValue() == 0)
   }
+
+  test("SPARK-42719: `MapOutputTracker#getMapLocation` should respect the config option") {
+    val rpcEnv = createRpcEnv("test")
+    val newConf = new SparkConf
+    newConf.set(SHUFFLE_REDUCE_LOCALITY_ENABLE, false)
+    val tracker = newTrackerMaster(newConf)
+    tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+      new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf))
+    tracker.registerShuffle(10, 6, 1)
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
+      Array(2L), 5))
+    val mockShuffleDep = mock(classOf[ShuffleDependency[Int, Int, _]])
+    when(mockShuffleDep.shuffleId).thenReturn(10)
+    assert(tracker.getMapLocation(mockShuffleDep, 0, 1) === Nil)
+    tracker.stop()

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130360051


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1113,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
       endMapIndex: Int): Seq[String] =
   {
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
+    if (shuffleLocalityEnabled && shuffleStatus != null) {
       shuffleStatus.withMapStatuses { statuses =>

Review Comment:
   I actually meant this:
   
   ```
   if (shuffleLocalityEnabled) return Nil
   val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
   if (shuffleStatus != null) {
   
   ```
   



##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1113,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
       endMapIndex: Int): Seq[String] =
   {
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
+    if (shuffleLocalityEnabled && shuffleStatus != null) {
       shuffleStatus.withMapStatuses { statuses =>

Review Comment:
   I actually meant this:
   
   ```
   if (!shuffleLocalityEnabled) return Nil
   val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
   if (shuffleStatus != null) {
   
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerqi commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130315958


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1113,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
       endMapIndex: Int): Seq[String] =
   {
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
+    if (shuffleStatus != null && shuffleLocalityEnabled) {

Review Comment:
   Ok, I will change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerqi commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130592715


##########
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala:
##########
@@ -1030,4 +1030,21 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     rpcEnv.shutdown()
     assert(npeCounter.intValue() == 0)
   }
+
+  test("SPARK-42719: `MapOutputTracker#getMapLocation` should respect the config option") {
+    val rpcEnv = createRpcEnv("test")
+    val newConf = new SparkConf
+    newConf.set(SHUFFLE_REDUCE_LOCALITY_ENABLE, false)
+    val tracker = newTrackerMaster(newConf)
+    tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+      new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf))
+    tracker.registerShuffle(10, 6, 1)
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
+      Array(2L), 5))
+    val mockShuffleDep = mock(classOf[ShuffleDependency[Int, Int, _]])
+    when(mockShuffleDep.shuffleId).thenReturn(10)
+    assert(tracker.getMapLocation(mockShuffleDep, 0, 1) === Nil)
+    tracker.stop()

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130360051


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1113,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
       endMapIndex: Int): Seq[String] =
   {
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
+    if (shuffleLocalityEnabled && shuffleStatus != null) {
       shuffleStatus.withMapStatuses { statuses =>

Review Comment:
   I actually meant this:
   
   ```
   if (!shuffleLocalityEnabled) return Nil
   
   val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
   if (shuffleStatus != null) {
   
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerqi commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130364436


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1113,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
       endMapIndex: Int): Seq[String] =
   {
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
+    if (shuffleLocalityEnabled && shuffleStatus != null) {
       shuffleStatus.withMapStatuses { statuses =>

Review Comment:
   I got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130313793


##########
core/src/main/scala/org/apache/spark/MapOutputTracker.scala:
##########
@@ -1113,7 +1113,7 @@ private[spark] class MapOutputTrackerMaster(
       endMapIndex: Int): Seq[String] =
   {
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
-    if (shuffleStatus != null) {
+    if (shuffleStatus != null && shuffleLocalityEnabled) {

Review Comment:
   nit: Why not avoid the `shuffleStatuses` query if `shuffleLocalityEnabled` is `false` and return `Nil` immediately ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40339:
URL: https://github.com/apache/spark/pull/40339#issuecomment-1461226978

   Will wait for the CI to succeed. Thanks for fixing this @jerqi !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`
URL: https://github.com/apache/spark/pull/40339


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40339:
URL: https://github.com/apache/spark/pull/40339#discussion_r1130569496


##########
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala:
##########
@@ -1030,4 +1030,21 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     rpcEnv.shutdown()
     assert(npeCounter.intValue() == 0)
   }
+
+  test("SPARK-42719: `MapOutputTracker#getMapLocation` should respect the config option") {
+    val rpcEnv = createRpcEnv("test")
+    val newConf = new SparkConf
+    newConf.set(SHUFFLE_REDUCE_LOCALITY_ENABLE, false)
+    val tracker = newTrackerMaster(newConf)
+    tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+      new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf))
+    tracker.registerShuffle(10, 6, 1)
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
+      Array(2L), 5))
+    val mockShuffleDep = mock(classOf[ShuffleDependency[Int, Int, _]])
+    when(mockShuffleDep.shuffleId).thenReturn(10)
+    assert(tracker.getMapLocation(mockShuffleDep, 0, 1) === Nil)
+    tracker.stop()

Review Comment:
   I think we should always release the resource in `finally` block although other cases in this suite not use ` try finally`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerqi commented on pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #40339:
URL: https://github.com/apache/spark/pull/40339#issuecomment-1461236436

   thanks @mridulm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40339:
URL: https://github.com/apache/spark/pull/40339#issuecomment-1460624804

   Is this still in draft @jerqi ?
   Also, +CC @cloud-fan who reviewed this earlier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerqi commented on pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #40339:
URL: https://github.com/apache/spark/pull/40339#issuecomment-1461148705

   > Is this still in draft @jerqi ? Also, +CC @cloud-fan who reviewed this earlier.
   
   
   > Is this still in draft @jerqi ? Also, +CC @cloud-fan who reviewed this earlier.
   
   Thanks @mridulm It's ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #40339: [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #40339:
URL: https://github.com/apache/spark/pull/40339#issuecomment-1599100107

   We can reduce the network IO with local shuffle. Could we use parameter for different kind of PartitionSpec ?
   Like 
   ```java
         case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) =>
           if (!shuffleLocalityEnabled) return Nil
           tracker.getMapLocation(dependency, startMapIndex, endMapIndex)
   
         case PartialMapperPartitionSpec(mapIndex, _, _) =>
           if (!localShuffleLocalityEnabled) return Nil
           tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
   
         case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
           if (!localShuffleLocalityEnabled) return Nil
           tracker.getMapLocation(dependency, startMapIndex, endMapIndex)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org