You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/09/27 22:46:42 UTC

[GitHub] [spark] venkata91 opened a new pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

venkata91 opened a new pull request #34122:
URL: https://github.com/apache/spark/pull/34122


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Currently shuffle mergers are fetched before the start of the ShuffleMapStage. But for initial stages this can be problematic as shuffle mergers are nothing but unique hosts with shuffle services running which could be very few based on executors and this can cause merge ratio to be low. 
   
   With this approach, `ShuffleMapTask` query for merger locations if not available and if available and start using this for pushing the blocks. Since partitions are mapped uniquely to a merger location, it should be fine to not push for the earlier set of tasks. This should improve the merge ratio for even initial stages.
   
   Note: Currently this is in WIP because the changes are on top of SPARK-33701, once that gets merged will remove those changes updated it and remove WIP tag.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Performance improvement. No new APIs change.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Added unit tests and also has been working in our internal production environment for a while now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792329331



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   def shuffleMergeId: Int = _shuffleMergeId
 
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
+    assert(shuffleMergeEnabled || shuffleMergeAllowed)

Review comment:
       Makes sense. I don't think we need `shuffleMergeEnabled` now with `shuffleMergeAllowed` and `mergerLocs.isEmpty` can achieve the same purpose. Only thing is earlier, we explicitly set `shuffleMergeEnabled` to false if `shuffleMergeFinalized` is set to true in the retry/reuse case.
   
   Would it make sense to clear `mergerLocs` in that case when `shuffleMergeAllowed` is set to false (case where the stage is determinate and also `shuffleMergeFinalized`)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794980463



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -145,20 +148,26 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   }
 
   /**
-   * Returns true if push-based shuffle is disabled for this stage or empty RDD,
-   * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge
-   * results for all partitions are available.
+   * Returns true if the RDD is an empty RDD or if the shuffle merge for this shuffle is
+   * finalized.
    */
   def shuffleMergeFinalized: Boolean = {
-    // Empty RDD won't be computed therefore shuffle merge finalized should be true by default.
-    if (shuffleMergeEnabled && numPartitions > 0) {
-      _shuffleMergedFinalized
+    _shuffleMergedFinalized
+  }

Review comment:
       Good call. Thanks for catching it. Addressed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794980463



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -145,20 +148,26 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   }
 
   /**
-   * Returns true if push-based shuffle is disabled for this stage or empty RDD,
-   * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge
-   * results for all partitions are available.
+   * Returns true if the RDD is an empty RDD or if the shuffle merge for this shuffle is
+   * finalized.
    */
   def shuffleMergeFinalized: Boolean = {
-    // Empty RDD won't be computed therefore shuffle merge finalized should be true by default.
-    if (shuffleMergeEnabled && numPartitions > 0) {
-      _shuffleMergedFinalized
+    _shuffleMergedFinalized
+  }

Review comment:
       Good call. Thanks for catching it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793836705



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2608,8 +2608,12 @@ private[spark] class DAGScheduler(
     // isPushBasedShuffleEnabled and shuffleMergers need to be updated at the end.
     stage match {
       case s: ShuffleMapStage =>
-        stage.latestInfo.setPushBasedShuffleEnabled(s.shuffleDep.getMergerLocs.nonEmpty)
-        stage.latestInfo.setShuffleMergerCount(s.shuffleDep.getMergerLocs.size)
+        val isPushShuffleEnabled =
+          s.shuffleDep.shuffleMergeAllowed && s.shuffleDep.getMergerLocs.nonEmpty

Review comment:
       When will we have the case where mergerLocs is non empty, but merge is not allowed ?
   Is this for retry of DETERMINATE stage ? If yes, do we want to change definition of `def shuffleMergeEnabled` to be
   `shuffleMergeAllowed && mergerLocs.nonEmpty` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792761973



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   def shuffleMergeId: Int = _shuffleMergeId
 
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
+    assert(shuffleMergeEnabled || shuffleMergeAllowed)

Review comment:
       Yes, when we want it to be disabled (like with retry of DETERMINATE stage which was already finalized), we must set merge allowed to false. It is cleaner.
   Note - `def isShuffleMergeEnabled` or `def shuffleMergeEnabled` pointing to `mergerLocs.nonEmpty` is fine (and preferred to using mergerLocs.<foo>) - we just dont need the state boolean.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793834393



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       > In that case, should we also make shuffleMergeAllowed as well private[spark]. For now it is better to make everything private[spark] related to push-based shuffle right? Thoughts?
   
   `shuffleMergeAllowed` is similar to `shuffleMergeEnabled` - and has value as a developer api, which developers can query for - the setters dont.
   FinalizeTask on other hand is a purely internal impl detail.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780010060



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       Let us split the state out into two variables - since they are fundamentally two things represented earlier by one boolean.
   
   * `shuffleMergeAllowed`
   * `shufflerMergeEnabled`
   
   If `shuffleMergeAllowed` is `false`, then all push based shuffle paths are ignored - and is statically set at driver.
   It defaults to `canShuffleMergeBeEnabled` - but can be explicitly disabled : for example, when missing tasks are being computed for a determinate shuffle after it has been finalized.
   This would be set at driver before the stage starts - and can be depended upon at executors to enable push based shuffle paths.
   
   `shufflerMergeEnabled` can be set to `true` only if `shuffleMergeAllowed` is `true` and additional conditions are met - for example, sufficient mergers are available.
   Earlier, this was determined statically when stage started - with this PR, this can become `true` adaptively after stage starts.
   `shuffleMergedFinalized` can remain as it was defined earlier.
   
   Thoughts ?
   
   +CC @Ngone51, @otterc 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780451587



##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
     private[spark] val shuffleDepId: Option[Int] = None,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+    private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
       Yes, currently there are no consumers with in Spark. But [[SPARK-36620][SHUFFLE] Add client side push based shuffle metrics](https://github.com/apache/spark/pull/34000) has to use these metrics and populate it which is currently ongoing. Do you think we can move this change as well to that PR or wait for this one to get merged and then correspondingly make the changes in the other PR to consume these metrics? cc @thejdeep 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797940920



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler(
     // Register merge statuses if the stage is still running and shuffle merge is not finalized yet.
     // TODO: SPARK-35549: Currently merge statuses results which come after shuffle merge
     // TODO: is finalized is not registered.
-    if (runningStages.contains(stage) && !stage.shuffleDep.shuffleMergeFinalized) {
+    if (runningStages.contains(stage) && !stage.shuffleDep.isShuffleMergeFinalizedMarked) {

Review comment:
       Filed one for this issue - https://issues.apache.org/jira/browse/SPARK-38092




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796058549



##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -131,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
     val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
     val (blocksByAddress, canEnableBatchFetch) =
-      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+      if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {

Review comment:
       Makes sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792761973



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   def shuffleMergeId: Int = _shuffleMergeId
 
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
+    assert(shuffleMergeEnabled || shuffleMergeAllowed)

Review comment:
       Yes, when we want it to be disabled (like with retry of DETERMINATE stage which was already finalized), we must set merge allowed to false. It is cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)
   Speaking of which, can you make `getFinalizeTask` `private[spark]` as well ? (this or different pr - `ShuffleDependency` is developer api and we cant leak impl details)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r788287411



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1364,6 +1430,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   def unregisterShuffle(shuffleId: Int): Unit = {
     mapStatuses.remove(shuffleId)
     mergeStatuses.remove(shuffleId)
+    shufflePushMergerLocations.remove(shuffleId)

Review comment:
       Agree, that should be enough I think. Let me know if you think otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785243416



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Briefly looked at this one, currently there isn't a way to communicate from `BlockManagerMasterEndpoint` to `DAGScheduler`. Also the state info is in 3 different places, `DAGScheduler` querying merger locations, `YarnSchedulerBackend` having the logic of desired num mergers and passing the `excludeNodes` info to `BlockManagerMasterEndpoint`. `BlockManagerMasterEndpoint` itself preferring active executors hosts first and the remaining merger locations from the dead executors host.
   
   Apart from this the number of `ExecutorAdded` events should be much smaller than the `TaskStart and TaskEnd` events, so this shouldn't be any worse than what we have already. Let me know your thoughts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794698873



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       @venkata91 Let us fix the changes from master, which are not in 3.2/older - and open a jira for the others as a follow up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)
   Speaking of which, can you make `getFinalizeTask`/`getFinalizeTask` `private[spark]` as well ? (`ShuffleDependency` is developer api and we cant leak impl details)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1022330045


   Can you take a look at the build failures @venkata91 ? Not sure if it requires to be updated to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793016468



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       In that case, should we also make `shuffleMergeAllowed` as well `private[spark]`. For now it is better to make everything `private` related to push-based shuffle right? Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794981430



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Created a separate JIRA to address this issue - https://issues.apache.org/jira/browse/SPARK-38064




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794072564



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Good call on `incPushCompleted`.
   Both `setMergerLocs` and `newShuffleMergeState` are in released versions, which makes it more difficult to make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792275865



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       Yes, that is correct.
   As you mentioned above,
   
   - retry for an INDETERMINATE shuffle - `newShuffleMergeState` will set `_shuffleMergedFinalized` to false.
   - retry for a DETERMINATE shuffle 
     - `mergerLocs.nonEmpty` (merge enabled) -> `_shuffleMergeFinalized` would be `true` -> `shuffleMergeAllowed` -> false for retry/reuse cases.
     - `mergerLocs.isEmpty` (merge disabled) -> `_shuffleMergeFinalized` would be `false` -> `shuffleMergeAllowed` -> `true` for retry/reuse cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928558795


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48175/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928477169


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48175/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780010060



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       Let us split the state out into two variables - since they are fundamentally two things represented earlier by one boolean.
   
   * `shuffleMergeAllowed`
   * `shufflerMergeEnabled`
   
   If `shuffleMergeAllowed` is `false`, then all push based shuffle paths are ignored - and is statically set at driver.
   It defaults to `canShuffleMergeBeEnabled` - but can be explicitly disabled : for example, when missing tasks are being computed for a determinate shuffle after it has been finalized.
   This would be set at driver before the stage starts - and can be dependent upon at executors to enable push based shuffle paths.
   
   `shufflerMergeEnabled` can be set to `true` only if `shuffleMergeAllowed` is `true` and additional conditions are met - for example, sufficient mergers are available.
   Earlier, this was determined statically when stage started - with this PR, this can become `true` adaptively after stage starts.
   `shuffleMergedFinalized` can remain as it was defined earlier.
   
   Thoughts ?
   
   +CC @Ngone51, @otterc 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779745813



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1371,19 +1371,33 @@ private[spark] class DAGScheduler(
   private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
     assert(stage.shuffleDep.shuffleMergeEnabled && !stage.shuffleDep.shuffleMergeFinalized)
     if (stage.shuffleDep.getMergerLocs.isEmpty) {
+      getAndSetShufflePushMergerLocations(stage)
+    }
+
+    if (stage.shuffleDep.shuffleMergeEnabled) {
+      logInfo(("Shuffle merge enabled before starting the stage for %s (%s) with %d" +
+        " merger locations").format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+    } else {
+      logInfo(("Shuffle merge disabled for %s (%s), but can get enabled later" +
+        " adaptively once enough mergers are available").format(stage, stage.name))

Review comment:
       Yes, above are the only cases when it should be invoked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r786236348



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       Agree, good callout. This is definitely a potential issue with indeterminate stages. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1013812565


   The PR is looking mostly good, barring a few minor comments above - thanks for pushing through this @venkata91 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792329758



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1213,10 +1273,10 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
       useMergeResult: Boolean): MapSizesByExecutorId = {
     logDebug(s"Fetching outputs for shuffle $shuffleId")
     val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf,
-      // EnableBatchFetch can be set to false during stage retry when the
-      // shuffleDependency.shuffleMergeEnabled is set to false, and Driver
+      // enableBatchFetch can be set to false during stage retry when the

Review comment:
       Fixed earlier usages of `shuffleMergeEnabled` appropriately and some minor typos.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794072767



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       +CC @tgravescs, @Ngone51 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797940406



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler(
     // Register merge statuses if the stage is still running and shuffle merge is not finalized yet.
     // TODO: SPARK-35549: Currently merge statuses results which come after shuffle merge
     // TODO: is finalized is not registered.
-    if (runningStages.contains(stage) && !stage.shuffleDep.shuffleMergeFinalized) {
+    if (runningStages.contains(stage) && !stage.shuffleDep.isShuffleMergeFinalizedMarked) {

Review comment:
       Agree with your analysis on both @venkata91 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928577662


   **[Test build #143662 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143662/testReport)** for PR 34122 at commit [`13ad530`](https://github.com/apache/spark/commit/13ad530ca6ca574336f15192fab584fe6400cf24).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928425690


   **[Test build #143662 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143662/testReport)** for PR 34122 at commit [`13ad530`](https://github.com/apache/spark/commit/13ad530ca6ca574336f15192fab584fe6400cf24).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792275865



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       Yes, that is correct.
   As you mentioned above,
   
   - retry for an INDETERMINATE shuffle - `newShuffleMergeState` will set `_shuffleMergedFinalized` to false.
   - retry for a DETERMINATE shuffle 
     - `mergerLocs.nonEmpty` (merge enabled) -> `_shuffleMergeFinalized` would be `true` -> `shuffleMergeAllowed` -> `false` for retry/reuse cases.
     - `mergerLocs.isEmpty` (merge disabled) -> `_shuffleMergeFinalized` would be `false` -> `shuffleMergeAllowed` -> `true` for retry/reuse cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793836705



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2608,8 +2608,12 @@ private[spark] class DAGScheduler(
     // isPushBasedShuffleEnabled and shuffleMergers need to be updated at the end.
     stage match {
       case s: ShuffleMapStage =>
-        stage.latestInfo.setPushBasedShuffleEnabled(s.shuffleDep.getMergerLocs.nonEmpty)
-        stage.latestInfo.setShuffleMergerCount(s.shuffleDep.getMergerLocs.size)
+        val isPushShuffleEnabled =
+          s.shuffleDep.shuffleMergeAllowed && s.shuffleDep.getMergerLocs.nonEmpty

Review comment:
       When will we have the case where mergerLocs is non empty, but merge is not allowed ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r788286262



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       1. For `MapOutputTrackerMaster`, instead of keeping track of both `<shuffleId, shuffleMergeId>` for shuffle merger locations, I think we can piggy back on `unregisterAllMapAndMergeOutput` to clear the `mergers` in`ShuffleStatus` 
   
   2. For `MapOutputTrackerWorker` based on this [comment](https://github.com/apache/spark/pull/34122#discussion_r785388799), we can clear the `shufflePushMergerLocations` in `updateEpoch`.
   
    This makes the code much simpler and neat. Let me know if you think otherwise or this has any other issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785387856



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1331,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    shufflePushMergerLocations.getOrElse(shuffleId, getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    fetchingLock.withLock(shuffleId) {
+      val mergers = askTracker[Seq[BlockManagerId]](GetShufflePushMergerLocations(shuffleId))

Review comment:
       Check if `shufflePushMergerLocations(shuffleId)` is nonEmpty before fetching it again - in case of concurrent updates to before we acquire the lock for shuffleId
   (See `getStatuses` or other uses of `fetchingLock` for example).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796929787



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
             }
 
             if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
-              if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+              if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
                 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
                 checkAndScheduleShuffleMergeFinalize(shuffleStage)
               } else {

Review comment:
       In `processShuffleMapStageCompletion`, add something like 
   ```
       if (!shuffleStage.isIndeterminate && shuffleStage.shuffleDep.shuffleMergeEnabled) {
         shuffleStage.shuffleDep.setShuffleMergeAllowed(false)
       }
   ``` 
   to ensure we dont retry merge for determinate stages ?
   
   This is strictly not related to this PR, so I am fine with doing it in a follow up PR as well to keep the scope contained (we can a jira in that case).

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
 
     completeShuffleMapStageSuccessfully(0, 0, parts)
     val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-    assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
       Looks like this was missed ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("exec2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)

Review comment:
       Let us keep them separate, and do it with the other jira if we decide to take that path.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler(
     // Register merge statuses if the stage is still running and shuffle merge is not finalized yet.
     // TODO: SPARK-35549: Currently merge statuses results which come after shuffle merge
     // TODO: is finalized is not registered.
-    if (runningStages.contains(stage) && !stage.shuffleDep.shuffleMergeFinalized) {
+    if (runningStages.contains(stage) && !stage.shuffleDep.isShuffleMergeFinalizedMarked) {

Review comment:
       QQ:
   We are not checking if merge id is the same here.
   Consider case where `finalizeShuffleMerge` takes 10s or so - and we have stage cancellation/reexecution in the mean time.
   
   We should do this for both `RegisterMergeStatuses` and `ShuffleMergeFinalized`
   Thoughts ? (We can file a follow up jira for this if valid)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792273604



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     completeNextStageWithFetchFailure(3, 0, shuffleDep)
     scheduler.resubmitFailedStages()
 
-    // Make sure shuffle merge is disabled for the retry
     val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-    assert(!stage2.shuffleDep.shuffleMergeEnabled)
+    assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
       This is due to the change we made [here](https://github.com/apache/spark/pull/34122/files#diff-85de35b2e85646ed499c545a3be1cd3ffd525a88aae835a9c621f877eebadcb6L1421-L1422) 
   
   Earlier if enough mergers are not available, we would set `shuffleMergeEnabled = false` and `shuffleMergeFinalized` would return true even though it is not merge finalized as shuffle merge was not enabled in the first place based on this [code](https://github.com/apache/spark/blob/f6128a6f4215dc45a19209d799dd9bf98fab6d8a/core/src/main/scala/org/apache/spark/Dependency.scala#L154). This is actually not quite right which is explained here as part of this [comment](https://github.com/apache/spark/pull/34122/files#r785245294). Since the `shuffleMergeFinalized` behavior is changed, in the retry `shuffleMergeEnabled` will be `true (or mergerLocs.nonEmpty)` as enough mergers are now available and shuffle merge is not finalized as shuffle was not merge enabled in the first attempt due to not enough mergers. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r799920430



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
 
     completeShuffleMapStageSuccessfully(0, 0, parts)
     val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-    assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
       You are right, my bad.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785245294



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       I refactored this in to `shuffleMergeAllowed` and `shuffleMergeEnabled` to have a clear distinction  where `shuffleMergeAllowed` controls the static level of knobs like the following:
   1. `Is RDD barrier?`
   2. `Can Push shuffle be enabled?`
   3. `Disabling push shuffle for retry once the determinate stage attempt is finalized` etc.
   and `shuffleMergeEnabled` will be checked only when `shuffleMergeAllowed` is true along with that if sufficient mergers are available then it becomes `true`.
   
   Given all the above, we still require separate methods one for `isShuffleMergeFinalized` only checking for the `shuffleMergedFinalized` value and `numPartitions > 0`.  In the existing code, before the `ShuffleMapStage` starts we are checking `if (!shuffleMergeFinalized)` then only we are calling `prepareShuffleServicesForShuffleMapStage` but it is possible the previous stage never had `shuffleMergeEnabled` due to not enough mergers therefore even the retry also not be shuffle merge enabled, ideally this can be shuffle merge enabled if enough mergers are available. But for proceeding with the next stage, we need to check for `isShuffleMergeFinalizedIfEnabled` which is checking `if (shuffleMergeEnabled) isShuffleMergeFinalized else true` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779886030



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Ideally, we want to go into this `if` block only if a new host is added (since if executor is on existing/known hosts, number of mergers is not increasing) - more specifically, a new block manager in a new host is added (in `BlockManagerMasterEndpoint`).
   
   That is why I was thinking about doing it there ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779886670



##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
     private[spark] val shuffleDepId: Option[Int] = None,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+    private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
       But they are `private[spark]` with no consumers within spark right now, no ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779749067



##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
     private[spark] val shuffleDepId: Option[Int] = None,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+    private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
       Added this, so that we can get the information whether a stage is push enabled if so what is the number of shuffleMergers through the stage API? Should we split this out from this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785387151



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -145,19 +155,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   }
 
   /**
-   * Returns true if push-based shuffle is disabled for this stage or empty RDD,
-   * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge
-   * results for all partitions are available.
+   * Returns true if the RDD is an empty RDD or if the shuffle merge for this shuffle is
+   * finalized.
    */
-  def shuffleMergeFinalized: Boolean = {
+  def isShuffleMergeFinalized: Boolean = {
     // Empty RDD won't be computed therefore shuffle merge finalized should be true by default.
-    if (shuffleMergeEnabled && numPartitions > 0) {
+    if (numPartitions > 0) {

Review comment:
       Since `canShuffleMergeBeEnabled` already checks for `numPartition > 0`, do we need this check here ?

##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -910,4 +910,32 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     rpcEnv.shutdown()
     slaveRpcEnv.shutdown()
   }
+
+  test("SPARK-34826: Adaptive shuffle mergers") {
+    val newConf = new SparkConf
+    newConf.set("spark.shuffle.push.based.enabled", "true")
+    newConf.set("spark.shuffle.service.enabled", "true")
+
+    // needs TorrentBroadcast so need a SparkContext
+    withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc =>
+      val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+      val rpcEnv = sc.env.rpcEnv
+      val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
+      rpcEnv.stop(masterTracker.trackerEndpoint)
+      rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+
+      val slaveTracker = new MapOutputTrackerWorker(newConf)
+      slaveTracker.trackerEndpoint =
+        rpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
+
+      masterTracker.registerShuffle(20, 100, 100)
+      slaveTracker.updateEpoch(masterTracker.getEpoch)
+      val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x", s"host-$x", 7337))
+      masterTracker.registerShufflePushMergerLocations(20, mergerLocs)
+
+      assert(slaveTracker.getShufflePushMergerLocations(20).size == 10)
+      slaveTracker.unregisterShuffle(20)
+      assert(slaveTracker.shufflePushMergerLocations.isEmpty)

Review comment:
       Note: We would need to enhance this test with the additional case mentioned above for `shufflePushMergerLocations`.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    // Dummy executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("dummy", "dummy"))

Review comment:
       Let us make the host valid (say `host6` for example: from the new set of hosts) - in case this code evolves in future.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1331,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    shufflePushMergerLocations.getOrElse(shuffleId, getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    fetchingLock.withLock(shuffleId) {
+      val mergers = askTracker[Seq[BlockManagerId]](GetShufflePushMergerLocations(shuffleId))

Review comment:
       Check if `shufflePushMergerLocations(shuffleId)` is nonEmpty before fetching it again - in case of concurrent updates to before we acquire the lock for shuffleId
   (See `getStatuses` or other uses of `fetchingLock` for how).

##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -910,4 +910,32 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     rpcEnv.shutdown()
     slaveRpcEnv.shutdown()
   }
+
+  test("SPARK-34826: Adaptive shuffle mergers") {
+    val newConf = new SparkConf
+    newConf.set("spark.shuffle.push.based.enabled", "true")
+    newConf.set("spark.shuffle.service.enabled", "true")
+
+    // needs TorrentBroadcast so need a SparkContext
+    withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc =>
+      val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+      val rpcEnv = sc.env.rpcEnv
+      val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
+      rpcEnv.stop(masterTracker.trackerEndpoint)
+      rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+
+      val slaveTracker = new MapOutputTrackerWorker(newConf)

Review comment:
       rename as `workerTracker`
   
   Looks like some of the other push based shuffle PR's have introduced this term again - we should rename them to `worker` as well.
   Can you pls file a jira under the improvement tasks for this ? Thx.

##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
##########
@@ -59,13 +59,24 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
         rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
       val mapStatus = writer.stop(success = true)
       if (mapStatus.isDefined) {
+        // Check if sufficient shuffle mergers are available now for the ShuffleMapTask to push
+        if (dep.shuffleMergeAllowed && dep.getMergerLocs.isEmpty && !dep.isShuffleMergeFinalized) {

Review comment:
       We cannot have both `dep.getMergerLocs.isEmpty == true` and `dep.isShuffleMergeFinalized == false`.
   Drop `dep.isShuffleMergeFinalized`  here ?

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1364,6 +1430,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   def unregisterShuffle(shuffleId: Int): Unit = {
     mapStatuses.remove(shuffleId)
     mergeStatuses.remove(shuffleId)
+    shufflePushMergerLocations.remove(shuffleId)

Review comment:
       Review Note: I was trying to see if `shufflePushMergerLocations.clear` in `updateEpoch` was sufficient to handle the shuffle attempt id issue above - but I am not so sure if that is enough.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       This needs to account for shuffle merge id as well - to ensure the merger locations are consistent.
   Possible flow is:
   a) attempt 0 has enough mergers, and so executors cache value for attempt 0 mergers.
   b) one or more hosts get blacklisted before attempt 1.
   c) attempt 1 starts, with merge allowed = true, but merge enabled = false - so tasks start without mergers populated in dependency (merge might get enabled as more executors are added).
   d) executors which ran attempt 0 will use mergers from previous attempt - new executors will use mergers from attempt 1.
   

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs

Review comment:
       nit: This should be `clearMergerLocs()` given it is changing state - though not introduced in this PR.

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -106,7 +106,17 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle
   // is enabled
-  private[this] var _shuffleMergeEnabled = canShuffleMergeBeEnabled()
+  private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()
+
+  private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): Unit = {
+    _shuffleMergeAllowed = shuffleMergeAllowed
+  }
+
+  def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
+
+  // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle
+  // is enabled
+  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
 
   private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
     _shuffleMergeEnabled = shuffleMergeEnabled

Review comment:
       Do we need this boolean ? Or will `mergerLocs.nonEmpty` not do ? (`def shuffleMergeEnabled : Boolean = mergerLocs.nonEmpty`)
   
   Also, in `setShuffleMergeEnabled ` (if we are keeping it) or in `setMergerLocs`, add`assert (! shuffleMergeEnabled  || shuffleMergeAllowed)`

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    // Dummy executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("dummy", "dummy"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 7)

Review comment:
       Shouldn't this not be `6` ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))

Review comment:
       I am trying to understand why task`(1)` was completed and `addMergerLocs` is called before that.
   Any comments ? Thx
   
   (I understand that `addMergerLocs` needs to be called to make mergers >= 6, but why this specific flow ?)

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {
+      // Only set merger locations for stages that are not yet finished and have empty mergers
+      shuffleIdToMapStage.filter { case (_, stage) =>

Review comment:
       Scratch that, the cost is minimal at best after the re-order of filter checks below.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
     private[spark] val shuffleDepId: Option[Int] = None,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+    private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
       Sounds good, that PR is currently not consuming it - but if we are updating to also surface this, makes sense to include it.
   @thejdeep would be great if you can update #34000 once this is merged (or in follow up if #34000 gets merged before this PR). Thx !

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Thanks for clarifying, makes sense.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    // Dummy executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("dummy", "dummy"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 7)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 7)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    (2 to 6).foreach(x => {
+      runEvent(makeCompletionEvent(taskSets(0).tasks(x), Success, makeMapStatus("hostA", parts),
+        Seq.empty, Array.empty, createFakeTaskInfoWithId(x)))
+    })
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42, 2 -> 42, 3 -> 42, 4 -> 42, 5 -> 42, 6 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+    val parts = 7
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + ('A' + idx).toChar, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+    // Dummy executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("dummy", "dummy"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 7)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 7)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("hostA")
+      }
+      if (i < taskSets(0).tasks.size) {
+        runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2))
+      }
+    }
+
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalized)
+
+    // Successfully completing the retry of stage 0.
+    complete(taskSets(2), taskSets(2).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + ('A' + idx).toChar, parts))
+    }.toSeq)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host9", "host10"))
+    // Dummy executor added event to trigger registering of shuffle merger locations
+    runEvent(ExecutorAdded("dummy1", "dummy1"))
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 7)
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalized)
+    complete(taskSets(1), taskSets(1).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + ('A' + idx).toChar, parts, 10))
+    }.toSeq)
+    val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    // Shuffle merger locs should not be refreshed as the shuffle is already finalized
+    assert(mergerLocsBeforeRetry.sortBy(_.host) ===
+      shuffleStage1.shuffleDep.getMergerLocs.sortBy(_.host))

Review comment:
       We need to test for both DETERMINATE stages and INDETERMINATE stages.
   We should force a new set of mergers for INDETERMINATE stage retry - In a 'real world case', this would mean adding a previously selected merger to the excludeNodes - but for the test, we can simply clear `DAGSchedulerSuite .mergerLocs` and add a different set of hosts there.

##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -131,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
     val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
     val (blocksByAddress, canEnableBatchFetch) =
-      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+      if (baseShuffleHandle.dependency.isShuffleMergeFinalized) {

Review comment:
       Thanks @venkata91 !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785246582



##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -131,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
     val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
     val (blocksByAddress, canEnableBatchFetch) =
-      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+      if (baseShuffleHandle.dependency.isShuffleMergeFinalized) {

Review comment:
       This would be a bug although not related to this PR. But better to get this fixed. Will add a unit test as well to cover this case. Let me know if anyone thinks, it is better to fix this as part of a separate JIRA. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3579,7 +3579,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     submit(reduceRdd, (0 until parts).toArray)
     completeShuffleMapStageSuccessfully(0, 0, parts)

Review comment:
       Once we reintroduce the `shuffleMergeEnabled` method, all changes to this file (DAGSchedulerSuite) related to mergerLocs.isEmpty == !shuffleMergeEnabled (and reverse) can be reverted (for the latest [diff](https://github.com/apache/spark/pull/34122/commits/3b52bed7ea4bbdb127a4711201d8c8268ab06650#))
   

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4172,7 +4172,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
 
     // host2 executor added event to trigger registering of shuffle merger locations
     // as shuffle mergers are tracked separately for test
-    runEvent(ExecutorAdded("host2", "host2"))
+    runEvent(ExecutorAdded("host2", "exec2"))

Review comment:
       Order is reverse - `ExecutorAdded(execId: String, host: String)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793833288



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       > We are using mergerLocs.isEmpty in other places as well. If we use shuffleMergeEnabled (mergerLocs.nonEmpty) and mergerLocs.isEmpty|nonEmpty in few other places, wouldn't it be confusing?
   
   Yes, we should not blindly replace - only when the intent is to check if shuffle merge is enabled; though I would expect a empty/nonEmpty check against mergerLocs to pretty much be a proxy for is shuffle merge enabled.
   For example, the changes in the tests - the earlier intent was to check if merge was enabled; which got replaced with checks against mergerLocs
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794700609



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -160,14 +160,14 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    * this shuffle is finalized.
    */
   def isShuffleMergeFinalizedIfEnabled: Boolean = {
-    if (mergerLocs.nonEmpty) {
+    if (shuffleMergeEnabled) {
       shuffleMergeFinalized
     } else {
       true
     }
   }
 
-  def newShuffleMergeState(): Unit = {
+  private[spark] def newShuffleMergeState(): Unit = {

Review comment:
       This was in 3.2, let us move this change to a separate jira




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794718066



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -145,20 +148,26 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   }
 
   /**
-   * Returns true if push-based shuffle is disabled for this stage or empty RDD,
-   * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge
-   * results for all partitions are available.
+   * Returns true if the RDD is an empty RDD or if the shuffle merge for this shuffle is
+   * finalized.
    */
   def shuffleMergeFinalized: Boolean = {
-    // Empty RDD won't be computed therefore shuffle merge finalized should be true by default.
-    if (shuffleMergeEnabled && numPartitions > 0) {
-      _shuffleMergedFinalized
+    _shuffleMergedFinalized
+  }

Review comment:
       Thinking more, we are changing semantics of `shuffleMergeFinalized` here, which was publically exposed.
   What used to be `shuffleMergeFinalized` is now becoming `isShuffleMergeFinalizedIfEnabled` - so the behavior is exposed by another method.
   
   We should preserve the earlier semantics for the method ... 
   For this PR, that would mean:
   
   1) Rename `shuffleMergeFinalized` as `isShuffleMergeFinalized` (a private[spark] method).
   
   2) Rename `isShuffleMergeFinalizedIfEnabled` to `shuffleMergeFinalized` - which will preserve the earlier behavior as well.
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794997274



##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -855,7 +855,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     rpcEnv.shutdown()
   }
 
-  test("SPARK-37023: Avoid fetching merge status when shuffleMergeEnabled is false") {
+  test("SPARK-37023: Avoid fetching merge status when isShuffleMergeFinalizedMarked is false") {

Review comment:
       nit: isShuffleMergeFinalizedMarked -> useMergeResult ?
   That is what we are testing here actually (this test went through some changes as code evolved, but looks like test name was missed).

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4051,8 +4050,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     runEvent(StageCancelled(0, Option("Explicit cancel check")))
     scheduler.handleShuffleMergeFinalized(shuffleStage1, shuffleStage1.shuffleDep.shuffleMergeId)
 
-    assert(shuffleStage1.shuffleDep.shuffleMergeEnabled)
-    assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage1.shuffleDep.mergerLocs.nonEmpty)
+    assert(!shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       nit: revert ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("exec2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)

Review comment:
       Based on discussion [above](https://github.com/apache/spark/pull/34122/files#r790376895), we might need to end up merging this test with the next ("SPARK-34826: Adaptively fetch shuffle mergers with stage retry for indeterminate stage").

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1341,33 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    shufflePushMergerLocations.getOrElse(shuffleId, getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    val mergers = shufflePushMergerLocations.get(shuffleId).orNull
+    if (mergers == null) {

Review comment:
       nit:
   This (get and null check) is not required - we are invoking this method only when `shufflePushMergerLocations` does not have an entry.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -728,7 +728,8 @@ private[spark] class DAGScheduler(
                 // Mark mapStage as available with shuffle outputs only after shuffle merge is
                 // finalized with push based shuffle. If not, subsequent ShuffleMapStage won't
                 // read from merged output as the MergeStatuses are not available.
-                if (!mapStage.isAvailable || !mapStage.shuffleDep.shuffleMergeFinalized) {
+                if (!mapStage.isAvailable ||
+                  !mapStage.shuffleDep.shuffleMergeFinalized) {

Review comment:
       nit: revert ?

##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
##########
@@ -59,13 +59,25 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
         rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
       val mapStatus = writer.stop(success = true)
       if (mapStatus.isDefined) {
+        // Check if sufficient shuffle mergers are available now for the ShuffleMapTask to push
+        if (dep.shuffleMergeAllowed && dep.getMergerLocs.isEmpty) {
+          val mapOutputTracker = SparkEnv.get.mapOutputTracker
+          val mergerLocs =
+            mapOutputTracker.getShufflePushMergerLocations(dep.shuffleId)
+          if (mergerLocs.nonEmpty) {
+            dep.setMergerLocs(mergerLocs)
+          }
+        }
         // Initiate shuffle push process if push based shuffle is enabled
         // The map task only takes care of converting the shuffle data file into multiple
         // block push requests. It delegates pushing the blocks to a different thread-pool -
         // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
-        if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
+        if (dep.getMergerLocs.nonEmpty && !dep.isShuffleMergeFinalizedMarked) {

Review comment:
       With the recent changes, wont `!dep.shuffleMergeFinalized` not be sufficient ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4082,7 +4081,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(shuffleIndeterminateStage.isIndeterminate)
     scheduler.handleShuffleMergeFinalized(shuffleIndeterminateStage, 2)
     assert(shuffleIndeterminateStage.shuffleDep.shuffleMergeEnabled)
-    assert(!shuffleIndeterminateStage.shuffleDep.shuffleMergeFinalized)
+    assert(!shuffleIndeterminateStage.shuffleDep.isShuffleMergeFinalizedMarked)
   }

Review comment:
       nit: revert ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3926,7 +3925,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     val finalizeTask1 = shuffleStage1.shuffleDep.getFinalizeTask.get
       .asInstanceOf[DummyScheduledFuture]
     assert(finalizeTask1.delay == 10 && finalizeTask1.registerMergeResults)
-    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       nit: revert ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
     assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
 
-    assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
-    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked)
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       nit: Revert these ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
 
     completeShuffleMapStageSuccessfully(0, 0, parts)
     val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-    assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
       check for `!shuffleMergeAllowed` in addition to empty mergers here ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4130,7 +4129,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
     val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
     assert(shuffleStage1.shuffleDep.shuffleMergeEnabled)
-    assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(!shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       nit: revert ?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     completeNextStageWithFetchFailure(3, 0, shuffleDep)
     scheduler.resubmitFailedStages()
 
-    // Make sure shuffle merge is disabled for the retry
     val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-    assert(!stage2.shuffleDep.shuffleMergeEnabled)
+    assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
       I was thinking more about this ... should we always mark DETERMINATE stages with `setShuffleMergeAllowed(false)` when they complete ?
   It makes things more cleaner, and in line with expectation whether mergers were used for first execution or not.
   
   Thoughts ?

##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -131,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
     val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
     val (blocksByAddress, canEnableBatchFetch) =
-      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+      if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {

Review comment:
       Given the changes, should this be `shuffleMergeEnabled && isShuffleMergeFinalizedMarked ` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1032871866


   Merged to master.
   Thanks for working on this @venkata91 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796060753



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
     assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
 
-    assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
-    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked)
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       I am slightly on the fence for some of these test reverts, since we are doing a stronger check now.
   So I am fine with leaving them as-is as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796195590



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4130,7 +4129,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
     val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
     assert(shuffleStage1.shuffleDep.shuffleMergeEnabled)
-    assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(!shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       Same as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r799178501



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
 
     completeShuffleMapStageSuccessfully(0, 0, parts)
     val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-    assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
       I think here the `shuffleMergeAllowed` won't be false. Here the mergers are not set due to not enough mergers, so it doesn't take push based shuffle path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797872918



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2313,7 +2326,7 @@ private[spark] class DAGScheduler(
     // Register merge statuses if the stage is still running and shuffle merge is not finalized yet.
     // TODO: SPARK-35549: Currently merge statuses results which come after shuffle merge
     // TODO: is finalized is not registered.
-    if (runningStages.contains(stage) && !stage.shuffleDep.shuffleMergeFinalized) {
+    if (runningStages.contains(stage) && !stage.shuffleDep.isShuffleMergeFinalizedMarked) {

Review comment:
       @mridulm If `shuffleMergeId` is not same we are not finalizing the merge right? But there could be a `finalize` request for the correct `shuffleMergeId` but we could have messed up the `MergeStatuses` in that case coming from a `shuffleMergeId`. Ok makes sense. Will file a separate JIRA for this one as well.
   
   On a side note, this is also another reason why we can `setShuffleMergeAllowed(false)` for determinate stages in `handleShuffleMergeFinalized` because it is possible that we might not finalize due to different `shuffleMergeId` finalize request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780451587



##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
     private[spark] val shuffleDepId: Option[Int] = None,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+    private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
       Yes, currently there are no consumers with in Spark. But [[SPARK-36620][SHUFFLE] Add client side push based shuffle metrics](https://github.com/apache/spark/pull/34000) has to use these metrics and populate it which is currently ongoing. Do you think we can move this change as well to that PR or wait for this one to get merged and then correspondingly make the changes in the other PR to consume these metrics? cc @thejdeep 

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Agree. Let me see how we can send an event when a new blockManager's host which is not already part of existing shuffleMergers and correspondingly fire an event to DAGScheduler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779287873



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    fetchingLock.withLock(shuffleId) {

Review comment:
       Any issues with reusing `fetchingLock` here ?
   +CC @Ngone51 in case there are concerns.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1333,22 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    shufflePushMergerLocations.get(shuffleId).getOrElse(getMergerLocations(shuffleId))

Review comment:
       `shufflePushMergerLocations.getOrElse` here ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
##########
@@ -39,7 +39,9 @@ class StageInfo(
     val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
     private[spark] val shuffleDepId: Option[Int] = None,
-    val resourceProfileId: Int) {
+    val resourceProfileId: Int,
+    private[spark] var isPushBasedShuffleEnabled: Boolean = false,
+    private[spark] var shuffleMergerCount: Int = 0) {

Review comment:
       Why do we need these ?

##########
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
##########
@@ -59,13 +60,26 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
         rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
       val mapStatus = writer.stop(success = true)
       if (mapStatus.isDefined) {
+        val isPushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(SparkEnv.get.conf,
+          isDriver = SparkContext.DRIVER_IDENTIFIER == SparkEnv.get.executorId)
+        // Check if sufficient shuffle mergers are available now for the ShuffleMapTask to push
+        if (isPushBasedShuffleEnabled && dep.getMergerLocs.isEmpty && !dep.shuffleMergeFinalized) {

Review comment:
       Review note: Here we are depending on the fact that retry of determinate stages will be merge finalized - while indeterminate stages wont be - and so the `!dep.shuffleMergeFinalized` takes care of not trying to fetch mergers/enable push based shuffle for retry of determinate stage.

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {
+    _shuffleMergedFinalized
+  }
+
   /**
    * Returns true if push-based shuffle is disabled for this stage or empty RDD,
    * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge
    * results for all partitions are available.
    */
-  def shuffleMergeFinalized: Boolean = {
+  def isShuffleMergeOutputsAvailable: Boolean = {

Review comment:
       The method name and semantics of it are a bit confusing (even though we document the expectation).
   We are returning `true` here when push based shuffle is disabled. The expectation would be to respond with `false` for this method name.
   
   How about rename the new `shuffleMergeFinalized` to `isShufleMergeFinalizeEnabled` and make this method `isShuffleMergeFinalizeCompleted` ?
   (Or something similar and descriptive)

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1371,19 +1371,33 @@ private[spark] class DAGScheduler(
   private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
     assert(stage.shuffleDep.shuffleMergeEnabled && !stage.shuffleDep.shuffleMergeFinalized)
     if (stage.shuffleDep.getMergerLocs.isEmpty) {
+      getAndSetShufflePushMergerLocations(stage)
+    }
+
+    if (stage.shuffleDep.shuffleMergeEnabled) {
+      logInfo(("Shuffle merge enabled before starting the stage for %s (%s) with %d" +
+        " merger locations").format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+    } else {
+      logInfo(("Shuffle merge disabled for %s (%s), but can get enabled later" +
+        " adaptively once enough mergers are available").format(stage, stage.name))

Review comment:
       Review note: This method is invoked only for:
   * Initial stage attempt for a shuffle.
   * All attempts for an indeterminate stage (since prev outputs are discarded)
   * Only first attempt for a determinate stage, and not for subsequent attempts.
   
   Given this, the log message is correct.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -429,13 +441,17 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
 private[spark] case class GetMapAndMergeResultStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
+private[spark] case class GetShufflePushMergerLocations(shuffleId: Int)
+  extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
 
 private[spark] sealed trait MapOutputTrackerMasterMessage
 private[spark] case class GetMapOutputMessage(shuffleId: Int,
   context: RpcCallContext) extends MapOutputTrackerMasterMessage
 private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int,
   context: RpcCallContext) extends MapOutputTrackerMasterMessage
+private[spark] case class GetShuffleMergersMessage(shuffleId: Int,
+  context: RpcCallContext) extends MapOutputTrackerMasterMessage

Review comment:
       Rename to `GetShuflePushMergersMessage` to name it similar to its `MapOutputTrackerMessage` message ?
   

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Do this only if new executor is on a new host ?
   One thought exercise would be to fire an event to DAGScheduler from `BlockManagerMasterEndpoint.addMergerLocation` when a new merger location is added -  `getAndSetShufflePushMergerLocations` could result in populating new mergers only in that case anyway.

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       This redefinition of `shuffleMergeFinalized `, particularly given the earlier semantics of `shuffleMergeFinalized`, is a bit confusing - I need to relook at how `shuffleMergeFinalized` is used to ensure there are no issues with this.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -457,6 +473,12 @@ private[spark] class MapOutputTrackerMasterEndpoint(
       logInfo(s"Asked to send map/merge result locations for shuffle $shuffleId to $hostPort")
       tracker.post(GetMapAndMergeOutputMessage(shuffleId, context))
 
+    case GetShufflePushMergerLocations(shuffleId: Int) =>
+      val hostPort = context.senderAddress.hostPort
+      logInfo(s"Asked to send shuffle push merger locations for shuffle" +
+        s" $shuffleId to $hostPort")

Review comment:
       nit: move `hostPort` into the `logInfo`
   Same below as well.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {
+      // Only set merger locations for stages that are not yet finished and have empty mergers
+      shuffleIdToMapStage.filter { case (_, stage) =>

Review comment:
       Instead of iterating over all entries in `shuffleIdToMapStage` - materialize subset of stages which satisfy the condition `stage.shuffleDep.getMergerLocs.isEmpty` in a `Set` ?
   We know which stages need to be in this set based on whether stage is candidate for enabling push based shuffle and which had the initial set of mergers empty while submitting the stage.
   We can clean up the `Set` when stage completes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779745057



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       Yes I agree. With Adaptively fetching shuffle mergers, we need to check whether the `shuffleId` is `shuffleMergeFinalized` or not but that internally checks `shuffleMergeEnabled` which would be false since we didn't have enough mergers during the start of the stage. Which is why we need to have an additional method distinguishing whether shuffle merge is finalized vs whether shuffleMergeEnabled and then shuffleMergeFinalized whenever we try to figure which stage to be computed next. We need to prevent the next stage from starting if the previous stage shuffle merge finalization is yet to be completed ([here](https://github.com/apache/spark/blob/f6128a6f4215dc45a19209d799dd9bf98fab6d8a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L731)).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r788286262



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       1. For `MapOutputTrackerMaster`, instead of keeping track of both `<shuffleId, shuffleMergeId>` for shuffle merger locations, I think we can piggy back on `unregisterAllMapAndMergeOutput` to clear the `mergers` in`ShuffleStatus` 
   
   2. For `MapOutputTrackerWorker` based on this [comment](https://github.com/apache/spark/pull/34122#discussion_r785388799)), we can clear the `shufflePushMergerLocations` in `updateEpoch`.
   
    This makes the code much simpler and neat. Let me know if you think otherwise or this has any other issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1017042155


   Addressed the review comments @mridulm Please take a look whenever you have some time. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r788292151



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1364,6 +1430,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   def unregisterShuffle(shuffleId: Int): Unit = {
     mapStatuses.remove(shuffleId)
     mergeStatuses.remove(shuffleId)
+    shufflePushMergerLocations.remove(shuffleId)

Review comment:
       For the sake of consistency, leaving the `shufflePushMergerLocations.remove(shuffleId)` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785389352



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -106,7 +106,17 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle
   // is enabled
-  private[this] var _shuffleMergeEnabled = canShuffleMergeBeEnabled()
+  private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()
+
+  private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): Unit = {
+    _shuffleMergeAllowed = shuffleMergeAllowed
+  }
+
+  def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
+
+  // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle
+  // is enabled
+  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
 
   private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
     _shuffleMergeEnabled = shuffleMergeEnabled

Review comment:
       With all the changes we have made, do we need this boolean ? Or will `mergerLocs.nonEmpty` not do ? (`def shuffleMergeEnabled : Boolean = mergerLocs.nonEmpty`)
   
   Also, in `setShuffleMergeEnabled ` (if we are keeping it) or in `setMergerLocs`, add`assert (! shuffleMergeEnabled  || shuffleMergeAllowed)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785245294



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       I refactored this in to `shuffleMergeAllowed` and `shuffleMergeEnabled` to have a clear distinction  where `shuffleMergeAllowed` controls the static level of knobs like the following:
   1. `Is RDD barrier?`
   2. `Can Push shuffle be enabled?`
   3. `Disabling push shuffle for retry once the determinate stage attempt is finalized` etc.
   and `shuffleMergeEnabled` will be checked only when `shuffleMergeAllowed` is true along with that if sufficient mergers are available then it becomes `true`.
   
   Given all the above, I think we still require two separate methods one for `isShuffleMergeFinalized` only checking for the `shuffleMergedFinalized` value and `numPartitions > 0` and another `isShuffleMergeFinalizedIfEnabled` checking both `shuffleMergeEnabled` and `isShuffleMergeFinalized`.
    
    In the existing code, before the `ShuffleMapStage` starts we are checking `if (!shuffleMergeFinalized)` then only we are calling `prepareShuffleServicesForShuffleMapStage` but it is possible the previous stage never had `shuffleMergeEnabled` due to not enough mergers therefore even the retry also not be shuffle merge enabled, ideally this can be shuffle merge enabled if enough mergers are available. But for proceeding with the next stage, we need to check for `isShuffleMergeFinalizedIfEnabled` which is checking `if (shuffleMergeEnabled) isShuffleMergeFinalized else true`
    
    Let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793006586



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Yeah. thats true. We are using `mergerLocs.isEmpty` in other places as well. If we use `shuffleMergeEnabled (mergerLocs.nonEmpty)` and `mergerLocs.isEmpty|nonEmpty` in few other places, wouldn't it be confusing? Do you suggest to replace all the usages of `mergerLocs.isEmpty|nonEmpty` appropriately with `shuffleMergeEnabled | !shuffleMergeEnabled`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793881581



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2608,8 +2608,12 @@ private[spark] class DAGScheduler(
     // isPushBasedShuffleEnabled and shuffleMergers need to be updated at the end.
     stage match {
       case s: ShuffleMapStage =>
-        stage.latestInfo.setPushBasedShuffleEnabled(s.shuffleDep.getMergerLocs.nonEmpty)
-        stage.latestInfo.setShuffleMergerCount(s.shuffleDep.getMergerLocs.size)
+        val isPushShuffleEnabled =
+          s.shuffleDep.shuffleMergeAllowed && s.shuffleDep.getMergerLocs.nonEmpty

Review comment:
       Yes exactly. That makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928425690


   **[Test build #143662 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143662/testReport)** for PR 34122 at commit [`13ad530`](https://github.com/apache/spark/commit/13ad530ca6ca574336f15192fab584fe6400cf24).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928558795






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)
   Speaking of which, can you make `getFinalizeTask`/`getFinalizeTask` `private[spark]` as well ? (this or different pr - `ShuffleDependency` is developer api and we cant leak impl details)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792766502



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Let us keep this method - and make it
   `def shuffleMergeEnabled = mergerLocs.nonEmpty`
   What I meant was, let us remove the boolean `_shuffleMergeEnabled`
   
   So that rest of the code can depend on this as a state and not need to inspect the mergerLocs directly
   
   
   Also, this is part of public api - so cant remove it :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r792325877



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,210 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are not getting set for the
+    // retry of determinate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {
+        runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2))
+      }
+    }
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host4", "host5"))
+    // host4 executor added event shouldn't reset merger locations given merger locations
+    // are already set
+    runEvent(ExecutorAdded("host4", "host4"))
+
+    // Successfully completing the retry of stage 0.
+    complete(taskSets(2), taskSets(2).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq)
+
+    assert(shuffleStage1.shuffleDep.shuffleMergeId == 0)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    val newMergerLocs =
+      scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+    assert(mergerLocsBeforeRetry.sortBy(_.host) === newMergerLocs.sortBy(_.host))
+    val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    complete(taskSets(1), taskSets(1).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts, 10))
+    }.toSeq)
+    assert(shuffleStage2.shuffleDep.getMergerLocs.size == 2)
+    completeNextResultStageWithSuccess(2, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry for indeterminate stage") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are getting set for the
+    // retry of indeterminate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {

Review comment:
       No, internally we had this and just carried over as part of forward port. This is not required.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1369,24 +1370,34 @@ private[spark] class DAGScheduler(
    * locations for block push/merge by getting the historical locations of past executors.
    */
   private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
-    assert(stage.shuffleDep.shuffleMergeEnabled && !stage.shuffleDep.shuffleMergeFinalized)
+    assert(stage.shuffleDep.shuffleMergeAllowed && !stage.shuffleDep.shuffleMergeFinalized)
     if (stage.shuffleDep.getMergerLocs.isEmpty) {
-      val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-        stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-      if (mergerLocs.nonEmpty) {
-        stage.shuffleDep.setMergerLocs(mergerLocs)
-        logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
-          s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
-
-        logDebug("List of shuffle push merger locations " +
-          s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-      } else {
-        stage.shuffleDep.setShuffleMergeEnabled(false)
-        logInfo(s"Push-based shuffle disabled for $stage (${stage.name})")
-      }
+      getAndSetShufflePushMergerLocations(stage)
+    }
+
+    if (stage.shuffleDep.shuffleMergeEnabled) {
+      logInfo(("Shuffle merge enabled before starting the stage for %s (%s) with %d" +
+        " merger locations").format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r795960916



##########
File path: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
##########
@@ -131,7 +131,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
       metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
     val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
     val (blocksByAddress, canEnableBatchFetch) =
-      if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
+      if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {

Review comment:
       @mridulm For the retry case, `shuffleMergeEnabled` would be false but `isShuffleMergeFinalizedMarked` but we still want this to return true right to fetch merge blocks generated by the first attempt? Therefore, won't `isShuffleMergeFinalizedMarked` be enough? Basically as long as shuffle merge is finalized (doesn't matter if shuffle merge is allowed or not), check for merge blocks to be fetched.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796060753



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
     assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
 
-    assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
-    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked)
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       I am slightly on the fence for some of these test reverts (`shuffleMergeFinalized` -> `isShuffleMergeFinalizedMarked`), since we are doing a stronger check now.
   So I am fine with leaving them as-is as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797942512



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
             }
 
             if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
-              if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+              if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
                 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
                 checkAndScheduleShuffleMergeFinalize(shuffleStage)
               } else {

Review comment:
       Filed on for this issue as well - https://issues.apache.org/jira/browse/SPARK-38093




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797942898



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
 
     completeShuffleMapStageSuccessfully(0, 0, parts)
     val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-    assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
       Addressed it now. Sorry probably in the refactor, it went back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793837336



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     completeNextStageWithFetchFailure(3, 0, shuffleDep)
     scheduler.resubmitFailedStages()
 
-    // Make sure shuffle merge is disabled for the retry
     val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-    assert(!stage2.shuffleDep.shuffleMergeEnabled)
+    assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
       Thanks for clarifying.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r787352423



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    // Dummy executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("dummy", "dummy"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 7)

Review comment:
       Since the `parts` (numPartitions) is 7 and also we have 8 merger locations available, it would return 7. `spark.shuffle.push.mergersMinStaticThreshold` is set to 6 but if we have `numDesired` (in this case 7) then we would get 7. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r788286262



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       Instead of keeping track of both <shuffleId, shuffleMergeId> for shuffle merger locations, I think we can piggy back on `unregisterAllMapAndMergeOutput` to clear the `mergers` in `ShuffleStatus` similarly in the `updateEpoch` (like this [comment](https://github.com/apache/spark/pull/34122#discussion_r785388799)) we can clear the `shufflePushMergerLocations`. This makes the code much simpler and neat. Let me know if you think otherwise or this has any other issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #34122:
URL: https://github.com/apache/spark/pull/34122


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928558795


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48175/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928558795






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1006312364


   +CC @otterc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785245638



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {
+    _shuffleMergedFinalized
+  }
+
   /**
    * Returns true if push-based shuffle is disabled for this stage or empty RDD,
    * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge
    * results for all partitions are available.
    */
-  def shuffleMergeFinalized: Boolean = {
+  def isShuffleMergeOutputsAvailable: Boolean = {

Review comment:
       Agree, I have refactored this a bit and responded with a comment below. Please take a look and share your thoughts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r785388090



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       This needs to account for shuffle merge id as well - to ensure the merger locations are consistent.
   Possible flow is:
   a) attempt 0 has enough mergers, and so executors cache value for attempt 0 mergers.
   b) one or more hosts gets added to exclude list before attempt 1).
   c) attempt 1 starts, with merge allowed = true, but merge enabled = false - so tasks start without mergers populated in dependency (merge might get enabled as more executors are added).
   d) executors which ran attempt 0 will use mergers from previous attempt - new executors will use mergers from attempt 1.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1017788139


   Test in `HealthTrackerIntegrationSuite` is failing which I think is a transient issue. This is unrelated to this PR. Is this due to the `mutable map and immutable map being compared with ===`. Not sure why this issue is surfacing now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793887395



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       @mridulm How about `incPushCompleted` and `newShuffleMergeState`? Those should also be `private[spark]` right given that it is internal impl detail? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796929787



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
             }
 
             if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
-              if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+              if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
                 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
                 checkAndScheduleShuffleMergeFinalize(shuffleStage)
               } else {

Review comment:
       In `processShuffleMapStageCompletion`, add something like 
   ```
       if (!shuffleStage.isIndeterminate && shuffleStage.shuffleDep.shuffleMergeEnabled) {
         shuffleStage.shuffleDep.setShuffleMergeAllowed(false)
       }
   ``` 
   to ensure we dont retry merge for determinate stages ?
   
   This is strictly not related to this PR, so I am fine with doing it in a follow up PR as well to keep the scope contained (we can file a jira in that case).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797868653



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
             }
 
             if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
-              if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+              if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
                 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
                 checkAndScheduleShuffleMergeFinalize(shuffleStage)
               } else {

Review comment:
       I was thinking we can add this change in the `handleShuffleMergeFinalized` which would be better right? Given this is specific to push-based shuffle, making the change in `handleShuffleMergeFinalized` (at the end of finalization) marking shuffle merge is not allowed for the retry. Thoughts? In any case, let me file a new JIRA for this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797940716



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1821,7 +1834,7 @@ private[spark] class DAGScheduler(
             }
 
             if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
-              if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
+              if (!shuffleStage.shuffleDep.isShuffleMergeFinalizedMarked &&
                 shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
                 checkAndScheduleShuffleMergeFinalize(shuffleStage)
               } else {

Review comment:
       Sounds good, we can add it to `handleShuffleMergeFinalized` as well: that does look like a good place to introduce it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928425690


   **[Test build #143662 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143662/testReport)** for PR 34122 at commit [`13ad530`](https://github.com/apache/spark/commit/13ad530ca6ca574336f15192fab584fe6400cf24).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796060753



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3613,8 +3613,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
     assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
 
-    assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
-    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage2.shuffleDep.isShuffleMergeFinalizedMarked)
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       I am slightly on the fence for some of these test reverts (`shuffleMergeFinalized` -> `isShuffleMergeFinalizedMarked`), since we are doing a stronger check now.
   So I am fine with leaving them as they are in this PR as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794998416



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     completeNextStageWithFetchFailure(3, 0, shuffleDep)
     scheduler.resubmitFailedStages()
 
-    // Make sure shuffle merge is disabled for the retry
     val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-    assert(!stage2.shuffleDep.shuffleMergeEnabled)
+    assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
       I was thinking more about this ... should we always mark DETERMINATE stages with `setShuffleMergeAllowed(false)` when they complete ?
   It makes things more cleaner, and in line with expectation whether mergers were used for first execution or not.
   
   I dont feel very strongly about it though ...
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1028579797


   Can you take a look at the test failure @venkata91 ? It looks relevant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793834972



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -202,7 +204,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   // Only used by DAGScheduler to coordinate shuffle merge finalization
   @transient private[this] var finalizeTask: Option[ScheduledFuture[_]] = None
 
-  def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask
+  private[spark] def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask
 
   def setFinalizeTask(task: ScheduledFuture[_]): Unit = {

Review comment:
       Make the setter `private[spark]` as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r794698873



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       @venkata91 Let us fix (methods/variables which should be `private[spark]` but ended up getting exposed) the changes from master, which are not in 3.2/older - and open a jira for the others as a follow up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796195454



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3926,7 +3925,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     val finalizeTask1 = shuffleStage1.shuffleDep.getFinalizeTask.get
       .asInstanceOf[DummyScheduledFuture]
     assert(finalizeTask1.delay == 10 && finalizeTask1.registerMergeResults)
-    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       Based on the above [comment](https://github.com/apache/spark/pull/34122#discussion_r796060753), not making the change

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4051,8 +4050,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     runEvent(StageCancelled(0, Option("Explicit cancel check")))
     scheduler.handleShuffleMergeFinalized(shuffleStage1, shuffleStage1.shuffleDep.shuffleMergeId)
 
-    assert(shuffleStage1.shuffleDep.shuffleMergeEnabled)
-    assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    assert(shuffleStage1.shuffleDep.mergerLocs.nonEmpty)
+    assert(!shuffleStage1.shuffleDep.isShuffleMergeFinalizedMarked)

Review comment:
       Based on the above [comment](https://github.com/apache/spark/pull/34122#discussion_r796060753), not making the change

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4082,7 +4081,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(shuffleIndeterminateStage.isIndeterminate)
     scheduler.handleShuffleMergeFinalized(shuffleIndeterminateStage, 2)
     assert(shuffleIndeterminateStage.shuffleDep.shuffleMergeEnabled)
-    assert(!shuffleIndeterminateStage.shuffleDep.shuffleMergeFinalized)
+    assert(!shuffleIndeterminateStage.shuffleDep.isShuffleMergeFinalizedMarked)
   }

Review comment:
       Same as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-1026209657


   > I was thinking more about this ... should we always mark DETERMINATE stages with setShuffleMergeAllowed(false) when they complete ?
   > It makes things more cleaner, and in line with expectation whether mergers were used for first execution or not.
   > 
   > I dont feel very strongly about it though ...
   > Thoughts ?
   
   @mridulm This makes sense to me. Only thing is it can affect the tests as `shuffleMergeFinalized` also checks for `shuffleMergeEnabled` but that would return false and in turn return `true` always, but from your recent [comment](https://github.com/apache/spark/pull/34122#discussion_r796060753) that should not be the case anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r796077720



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("exec2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)

Review comment:
       Sorry not getting this comment quite right. Do you mean we should merge both determinate/indeterminate stage retry test for the adaptively fetch shuffle mergers?

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,206 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(!shuffleStage1.shuffleDep.shuffleMergeEnabled)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("exec2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)

Review comment:
       @mridulm Sorry not getting this comment quite right. Do you mean we should merge both determinate/indeterminate stage retry test for the adaptively fetch shuffle mergers?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r791949482



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1341,28 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    shufflePushMergerLocations.getOrElse(shuffleId, getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    val mergers = shufflePushMergerLocations.get(shuffleId).orNull
+    if (mergers == null) {
+      fetchingLock.withLock(shuffleId) {
+        val fetchedMergers =
+          askTracker[Seq[BlockManagerId]](GetShufflePushMergerLocations(shuffleId))
+        if (fetchedMergers.nonEmpty) {
+          shufflePushMergerLocations(shuffleId) = fetchedMergers
+          fetchedMergers
+        } else {
+          Seq.empty[BlockManagerId]
+        }
+      }
+    } else {

Review comment:
       Oh. yeah. My bad.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r793016468



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -114,15 +114,6 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 
   def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
 
-  // By default, shuffle merge is enabled for ShuffleDependency if shuffleMergeAllowed
-  private[this] var _shuffleMergeEnabled = shuffleMergeAllowed
-
-  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
-    _shuffleMergeEnabled = shuffleMergeEnabled
-  }
-
-  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       In that case, should we also make `shuffleMergeAllowed` as well `private[spark]`. For now it is better to make everything `private[spark]` related to push-based shuffle right? Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r797717323



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3671,7 +3671,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
 
     completeShuffleMapStageSuccessfully(0, 0, parts)
     val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
-    assert(!shuffleStage.shuffleDep.shuffleMergeEnabled)
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)

Review comment:
       Looks like this comment was missed ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r787356062



##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,128 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host6", "host7", "host8"))
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))

Review comment:
       Yes, this is not required.
   `    runEvent(makeCompletionEvent(
         taskSets(0).tasks(1), Success, makeMapStatus("hostA", parts),
         Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))`
   I probably must have written the test thinking that the `mergerLocs` are fetched from tasks rather it is pushed from `DAGScheduler`. Must be some confusion while writing the test. Clearly it is not required.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r788286262



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       Instead of keeping track of both `<shuffleId, shuffleMergeId>` for shuffle merger locations, I think we can piggy back on `unregisterAllMapAndMergeOutput` to clear the `mergers` in `ShuffleStatus` similarly in the `updateEpoch` (like this [comment](https://github.com/apache/spark/pull/34122#discussion_r785388799)) we can clear the `shufflePushMergerLocations`. This makes the code much simpler and neat. Let me know if you think otherwise or this has any other issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r788287896



##########
File path: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
##########
@@ -910,4 +910,32 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
     rpcEnv.shutdown()
     slaveRpcEnv.shutdown()
   }
+
+  test("SPARK-34826: Adaptive shuffle mergers") {
+    val newConf = new SparkConf
+    newConf.set("spark.shuffle.push.based.enabled", "true")
+    newConf.set("spark.shuffle.service.enabled", "true")
+
+    // needs TorrentBroadcast so need a SparkContext
+    withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc =>
+      val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+      val rpcEnv = sc.env.rpcEnv
+      val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
+      rpcEnv.stop(masterTracker.trackerEndpoint)
+      rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+
+      val slaveTracker = new MapOutputTrackerWorker(newConf)

Review comment:
       Replaced the usages in the current PR and also created a [JIRA](https://issues.apache.org/jira/browse/SPARK-37964) to replace completely everywhere in `MapOutputTrackerSuite`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r790361031



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -135,6 +144,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
   def shuffleMergeId: Int = _shuffleMergeId
 
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
+    assert(shuffleMergeEnabled || shuffleMergeAllowed)

Review comment:
       `shuffleMergeAllowed` is a superset - we dont need `shuffleMergeEnabled` here (and actually, is getting set after `setMergerLocs` in this PR).
   Also, see comment [above](https://github.com/apache/spark/pull/34122/files#r785389352) on whether we need the variable (`_shuffleMergeEnabled `) at all.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3686,14 +3686,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     completeNextStageWithFetchFailure(3, 0, shuffleDep)
     scheduler.resubmitFailedStages()
 
-    // Make sure shuffle merge is disabled for the retry
     val stage2 = scheduler.stageIdToStage(2).asInstanceOf[ShuffleMapStage]
-    assert(!stage2.shuffleDep.shuffleMergeEnabled)
+    assert(stage2.shuffleDep.shuffleMergeEnabled)

Review comment:
       Why did this behavior change ?
   I would expect it to be the same before/after this PR.
   
   For DETERMINATE stage, we should not have merge enabled for the retry if the previous attempt did finalize.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,210 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))

Review comment:
       nit: Use 'exec' instead of 'host' as prefix for executor id

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1176,6 +1223,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
   // instantiate a serializer. See the followup to SPARK-36705 for more details.
   private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf, isDriver = false)
 
+  // Exposed for testing
+  val shufflePushMergerLocations = new ConcurrentHashMap[Int, Seq[BlockManagerId]]().asScala

Review comment:
       Sounds good, this looks fine.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1369,24 +1370,34 @@ private[spark] class DAGScheduler(
    * locations for block push/merge by getting the historical locations of past executors.
    */
   private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
-    assert(stage.shuffleDep.shuffleMergeEnabled && !stage.shuffleDep.shuffleMergeFinalized)
+    assert(stage.shuffleDep.shuffleMergeAllowed && !stage.shuffleDep.shuffleMergeFinalized)
     if (stage.shuffleDep.getMergerLocs.isEmpty) {
-      val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-        stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-      if (mergerLocs.nonEmpty) {
-        stage.shuffleDep.setMergerLocs(mergerLocs)
-        logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
-          s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
-
-        logDebug("List of shuffle push merger locations " +
-          s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-      } else {
-        stage.shuffleDep.setShuffleMergeEnabled(false)
-        logInfo(s"Push-based shuffle disabled for $stage (${stage.name})")
-      }
+      getAndSetShufflePushMergerLocations(stage)
+    }
+
+    if (stage.shuffleDep.shuffleMergeEnabled) {
+      logInfo(("Shuffle merge enabled before starting the stage for %s (%s) with %d" +
+        " merger locations").format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))

Review comment:
       Please add the shuffle id and shuffle merge id as well for all log messages - while debugging an issue recently, we found the lack of this detail making it difficult to reason about the state.
   @otterc is planning to file a jira for this anyway, but let us address it for all log message in this PR as well.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,210 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are not getting set for the
+    // retry of determinate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {
+        runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2))
+      }
+    }
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host4", "host5"))
+    // host4 executor added event shouldn't reset merger locations given merger locations
+    // are already set
+    runEvent(ExecutorAdded("host4", "host4"))
+
+    // Successfully completing the retry of stage 0.
+    complete(taskSets(2), taskSets(2).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq)
+
+    assert(shuffleStage1.shuffleDep.shuffleMergeId == 0)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    val newMergerLocs =
+      scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+    assert(mergerLocsBeforeRetry.sortBy(_.host) === newMergerLocs.sortBy(_.host))
+    val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    complete(taskSets(1), taskSets(1).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts, 10))
+    }.toSeq)
+    assert(shuffleStage2.shuffleDep.getMergerLocs.size == 2)
+    completeNextResultStageWithSuccess(2, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry for indeterminate stage") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are getting set for the
+    // retry of indeterminate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {
+        runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2))
+      }
+    }
+
+    // Indeterminate stage should recompute all partitions, hence
+    // shuffleMergeFinalized should be false here
+    assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host4", "host5"))
+    // host4 executor added event should reset merger locations given merger locations
+    // are already reset
+    runEvent(ExecutorAdded("host4", "host4"))
+    // Successfully completing the retry of stage 0.
+    complete(taskSets(2), taskSets(2).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq)
+
+    assert(shuffleStage1.shuffleDep.shuffleMergeId == 2)

Review comment:
       Note: We really dont care about the merge id value - except that it should be different from earlier merge id.

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1281,6 +1341,28 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     }
   }
 
+  override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    shufflePushMergerLocations.getOrElse(shuffleId, getMergerLocations(shuffleId))
+  }
+
+  private def getMergerLocations(shuffleId: Int): Seq[BlockManagerId] = {
+    val mergers = shufflePushMergerLocations.get(shuffleId).orNull
+    if (mergers == null) {
+      fetchingLock.withLock(shuffleId) {
+        val fetchedMergers =
+          askTracker[Seq[BlockManagerId]](GetShufflePushMergerLocations(shuffleId))
+        if (fetchedMergers.nonEmpty) {
+          shufflePushMergerLocations(shuffleId) = fetchedMergers
+          fetchedMergers
+        } else {
+          Seq.empty[BlockManagerId]
+        }
+      }
+    } else {

Review comment:
       This is what I meant [here](https://github.com/apache/spark/pull/34122/#discussion_r785387856).
   
   ```suggestion
       if (mergers == null) {
         fetchingLock.withLock(shuffleId) {
           var fetchedMergers = shufflePushMergerLocations.get(shuffleId).orNull
           
           if (null == fetchedMergers) {
             fetchedMergers = askTracker[Seq[BlockManagerId]](GetShufflePushMergerLocations(shuffleId))
             if (fetchedMergers.nonEmpty) {
               shufflePushMergerLocations(shuffleId) = fetchedMergers
             } else {
               fetchedMergers = Seq.empty[BlockManagerId]
             }
           }
           fetchedMergers
       } else {
   ```
   

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,210 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are not getting set for the
+    // retry of determinate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {
+        runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2))
+      }
+    }
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host4", "host5"))
+    // host4 executor added event shouldn't reset merger locations given merger locations
+    // are already set
+    runEvent(ExecutorAdded("host4", "host4"))
+
+    // Successfully completing the retry of stage 0.
+    complete(taskSets(2), taskSets(2).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq)
+
+    assert(shuffleStage1.shuffleDep.shuffleMergeId == 0)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    val newMergerLocs =
+      scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+    assert(mergerLocsBeforeRetry.sortBy(_.host) === newMergerLocs.sortBy(_.host))
+    val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    complete(taskSets(1), taskSets(1).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts, 10))
+    }.toSeq)
+    assert(shuffleStage2.shuffleDep.getMergerLocs.size == 2)
+    completeNextResultStageWithSuccess(2, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry for indeterminate stage") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are getting set for the
+    // retry of indeterminate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {
+        runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2))
+      }
+    }
+
+    // Indeterminate stage should recompute all partitions, hence
+    // shuffleMergeFinalized should be false here
+    assert(!shuffleStage1.shuffleDep.shuffleMergeFinalized)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host4", "host5"))
+    // host4 executor added event should reset merger locations given merger locations
+    // are already reset
+    runEvent(ExecutorAdded("host4", "host4"))

Review comment:
       nit: assert here that `getMergerLocs` is not empty ?
   (Move the `assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)` from below to here).

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -144,12 +144,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     _shuffleMergedFinalized = true
   }
 
+  def shuffleMergeFinalized: Boolean = {

Review comment:
       > I refactored this in to `shuffleMergeAllowed` and `shuffleMergeEnabled` to have a clear distinction where `shuffleMergeAllowed` controls the static level of knobs like the following:
   > 
   > 1. `Is RDD barrier?`
   > 2. `Can Push shuffle be enabled?`
   > 3. `Disabling push shuffle for retry once the determinate stage attempt is finalized` etc.
   >    and `shuffleMergeEnabled` will be checked only when `shuffleMergeAllowed` is true along with that if sufficient mergers are available then it becomes `true`.
   > 
   > Given all the above, I think we still require two separate methods one for `isShuffleMergeFinalized` only checking for the `shuffleMergedFinalized` value and `numPartitions > 0` and another `isShuffleMergeFinalizedIfEnabled` checking both `shuffleMergeEnabled` and `isShuffleMergeFinalized`.
   
   
   Sounds good.
   
   > 
   > In the existing code, before the `ShuffleMapStage` starts we are checking `if (!shuffleMergeFinalized)` then only we are calling `prepareShuffleServicesForShuffleMapStage` but it is possible the previous stage never had `shuffleMergeEnabled` due to not enough mergers therefore even the retry also not be shuffle merge enabled, ideally this can be shuffle merge enabled if enough mergers are available. But for proceeding with the next stage, we need to check for `isShuffleMergeFinalizedIfEnabled` which is checking `if (shuffleMergeEnabled) isShuffleMergeFinalized else true`
   > 
   > Let me know what you think.
   
   There are two cases here:
   
     * retry for an INDETERMINATE shuffle.
       * We are recomputing all partitions, so we want to enable shuffle (based on shuffleMergeAllowed ofcourse).
         * Note: `newShuffleMergeState` will set `_shuffleMergedFinalized` to `false`.
     * retry for a DETERMINATE shuffle.
       * If shuffle was finalized, we dont want to enable merge - else we want to enable.
         * Since it is determinate shuffle, recomputation is generating identical data as previous attempt anyway).  
       * The existing codepath, which does this, should continue to work as-is with adaptive merge as well ?
   
   Given this, does anything change due to adaptive shuffle merge ? I dont see a difference, but please let me know.

##########
File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -4147,7 +4146,210 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     assert(finalizeTask2.delay == 0 && finalizeTask2.registerMergeResults)
   }
 
-    /**
+  test("SPARK-34826: Adaptively fetch shuffle mergers") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, makeMapStatus("hostA", parts),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.isEmpty)
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).isEmpty)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+
+    // Complete remaining tasks in ShuffleMapStage 0
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("host1", parts), Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are not getting set for the
+    // retry of determinate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {
+        runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2))
+      }
+    }
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+
+    DAGSchedulerSuite.addMergerLocs(Seq("host4", "host5"))
+    // host4 executor added event shouldn't reset merger locations given merger locations
+    // are already set
+    runEvent(ExecutorAdded("host4", "host4"))
+
+    // Successfully completing the retry of stage 0.
+    complete(taskSets(2), taskSets(2).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq)
+
+    assert(shuffleStage1.shuffleDep.shuffleMergeId == 0)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
+    val newMergerLocs =
+      scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+    assert(mergerLocsBeforeRetry.sortBy(_.host) === newMergerLocs.sortBy(_.host))
+    val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    complete(taskSets(1), taskSets(1).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts, 10))
+    }.toSeq)
+    assert(shuffleStage2.shuffleDep.getMergerLocs.size == 2)
+    completeNextResultStageWithSuccess(2, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-34826: Adaptively fetch shuffle mergers with stage retry for indeterminate stage") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 2)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1"))
+    val parts = 2
+
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts))
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil, indeterminate = true)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2),
+      tracker = mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val taskResults = taskSets(0).tasks.zipWithIndex.map {
+      case (_, idx) =>
+        (Success, makeMapStatus("host" + idx, parts))
+    }.toSeq
+
+    val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    DAGSchedulerSuite.addMergerLocs(Seq("host2", "host3"))
+    // host2 executor added event to trigger registering of shuffle merger locations
+    // as shuffle mergers are tracked separately for test
+    runEvent(ExecutorAdded("host2", "host2"))
+    // Check if new shuffle merger locations are available for push or not
+    assert(mapOutputTracker.getShufflePushMergerLocations(0).size == 2)
+    assert(shuffleStage1.shuffleDep.getMergerLocs.size == 2)
+    val mergerLocsBeforeRetry = shuffleStage1.shuffleDep.getMergerLocs
+
+    // Clear merger locations to check if new mergers are getting set for the
+    // retry of indeterminate stage
+    DAGSchedulerSuite.clearMergerLocs()
+
+    // Remove MapStatus on one of the host before the stage ends to trigger
+    // a scenario where stage 0 needs to be resubmitted upon finishing all tasks.
+    // Merge finalization should be scheduled in this case.
+    for ((result, i) <- taskResults.zipWithIndex) {
+      if (i == taskSets(0).tasks.size - 1) {
+        mapOutputTracker.removeOutputsOnHost("host0")
+      }
+      if (i < taskSets(0).tasks.size) {

Review comment:
       This condition is always true - did you mean something else ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r780452227



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Agree. Let me see how we can send an event when a new blockManager's host which is not already part of existing shuffleMergers and correspondingly fire an event to DAGScheduler




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a change in pull request #34122: [SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
venkata91 commented on a change in pull request #34122:
URL: https://github.com/apache/spark/pull/34122#discussion_r779751926



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2487,6 +2501,21 @@ private[spark] class DAGScheduler(
       executorFailureEpoch -= execId
     }
     shuffleFileLostEpoch -= execId
+
+    if (pushBasedShuffleEnabled) {

Review comment:
       Currently this is done as part of `executorAdded` event, do you think we should have a separate event which gets triggered only when the new executor added host is not already part of the existing `mergerLocs`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928425690






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928579469


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143662/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928547000


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48175/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34122: [WIP][SPARK-34826][SHUFFLE] Adaptively fetch shuffle mergers for push based shuffle

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34122:
URL: https://github.com/apache/spark/pull/34122#issuecomment-928579469


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143662/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org