You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "summaryzb (via GitHub)" <gi...@apache.org> on 2023/10/08 14:39:09 UTC

[PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

summaryzb opened a new pull request, #43280:
URL: https://github.com/apache/spark/pull/43280

   ### What changes were proposed in this pull request?
   According to weather the map data shuffled by the lost executor are all reliable stored, `ShuffleDriverComponent` can support reliable store with specified executorId
   
   ### Why are the changes needed?
   Downstream projects may have different shuffle policy to adapt to cluster loads for different stages, for example or Apache Uniffle with Gluten or Apache Celeborn. In this situation, ShuffleDriverComponent should use the mapTrackerMaster to decide weather support reliable storage by the specified executorId
   
   ### Does this PR introduce _any_ user-facing change?
   Enhances the `ShuffleDriverComponents` API, but defaults to current behavior.
   
   ### How was this patch tested?
   Unit tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1351195382


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,7 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage(execId) &&

Review Comment:
   I have the same issue. https://github.com/apache/spark/pull/43280#discussion_r1349985555



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350810420


##########
core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java:
##########
@@ -66,8 +66,19 @@ default void removeShuffle(int shuffleId, boolean blocking) {}
    * Does this shuffle component support reliable storage - external to the lifecycle of the
    * executor host ? For example, writing shuffle data to a distributed filesystem or
    * persisting it in a remote shuffle service.
+   *
+   * Note: This method is for compatibility with older implementations,
+   * the newer implementation should not use this method.
    */
   default boolean supportsReliableStorage() {
     return false;
   }
+
+  /**
+   * Does this executor support reliable storage for all shuffle data.
+   * @param execId The executor id, null for validation use only.

Review Comment:
   executor id should be valid - and spark will pass a valid id when using this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #43280:
URL: https://github.com/apache/spark/pull/43280#issuecomment-1752266964

   cc @mridulm @Ngone51 FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1349935364


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,7 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage(execId) &&

Review Comment:
   Sounds reasonable, fix



##########
core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java:
##########
@@ -62,12 +62,23 @@ default void registerShuffle(int shuffleId) {}
    */
   default void removeShuffle(int shuffleId, boolean blocking) {}
 
+  /**
+   * This method is for compatibility with older implementations
+   * the implementation should just directly return true
+   * when use distributed filesystem or in a disaggregated shuffle cluster
+   */

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1349906239


##########
core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java:
##########
@@ -62,12 +62,23 @@ default void registerShuffle(int shuffleId) {}
    */
   default void removeShuffle(int shuffleId, boolean blocking) {}
 
+  /**
+   * This method is for compatibility with older implementations
+   * the implementation should just directly return true
+   * when use distributed filesystem or in a disaggregated shuffle cluster
+   */

Review Comment:
   Shall we keep the origin comments here and add these new comments as `Note: ...` at last.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350032071


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   No. If we use `(supportsReliableStorage() || supportsReliableStorage(execId))`, newer implementation will always return true when use disaggregated shuffle cluster.
   
   In old implementation `supportsReliableStorage(execId)` always get true since not implemented yet, thus the same behavior as just call `supportsReliableStorage()`
   In newer implementation  `supportsReliableStorage()` should direct return  boolean, if it got true then `supportsReliableStorage(execId)` do real jugement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350091300


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   > Uh, the default value is true. Could we make it is false in default?
   
   Sorry I didn't see the other way to solve compatible issue, if make it false by default, shall you illustrate more



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350163857


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   > If `supportsReliableStorage(execId)` returns false in default, so `!(supportsReliableStorage() || supportsReliableStorage(execId))` compatible with before.
   
   what about the newer implementation
   
   >  If we use (supportsReliableStorage() || supportsReliableStorage(execId)), newer implementation will always return true when use disaggregated shuffle cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350193069


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   `supportsReliableStorage()` returns true, means file not lost. right ?
   `supportsReliableStorage(execId)` returns true, means file not lost too. right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350787954


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -661,7 +661,7 @@ class SparkContext(config: SparkConf) extends Logging {
             Some(new ExecutorAllocationManager(
               schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
               cleaner = cleaner, resourceProfileManager = resourceProfileManager,
-              reliableShuffleStorage = _shuffleDriverComponents.supportsReliableStorage()))
+              reliableShuffleStorage = _shuffleDriverComponents.supportsReliableStorage(null)))

Review Comment:
   You can revert this



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,7 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage(execId) &&

Review Comment:
   This has not been addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350249515


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   I'm a bit confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1349769875


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,7 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage(execId) &&

Review Comment:
   This is a backwardly incompatible - make it an `&&` of existing and new (`!supportsReliableStorage() && ! supportsReliableStorage(execId) ...`).
   Same for other places as well.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43280:
URL: https://github.com/apache/spark/pull/43280#issuecomment-1753930980

   Btw, it is not very clear to me how this functionality is going to be leveraged - unlers you are relying on resource profiles to tag which executors should leverage reliable shuffle and which should not.
   Spark will run tasks across stages on an executor - so there is no expectation of executor id to stage mapping (except via resource profiles).
   
   Please add more details in the PR description on how this can be leveraged reliably.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350219631


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   yes, When use disaggregated shuffle cluster `supportsReliableStorage(execId)` may return false since some local shuffle stages, while supportsReliableStorage() returns true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #43280: [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId
URL: https://github.com/apache/spark/pull/43280


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #43280:
URL: https://github.com/apache/spark/pull/43280#issuecomment-1899427729

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350055179


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   Uh, the default value is true. Could we make it is false in default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350349359


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   Ur right, it's really confused. How about now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1349985799


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1046,7 +1046,8 @@ private[spark] class TaskSetManager(
     // can destroy the whole host). The reason is the next stage wouldn't be able to fetch the
     // data from this dead executor so we would need to rerun these tasks on other executors.
     val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
-      !sched.sc.shuffleDriverComponents.supportsReliableStorage() &&
+      !(sched.sc.shuffleDriverComponents.supportsReliableStorage() &&
+        sched.sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   ditto



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   What's the logic here? Is it `!(supportsReliableStorage() || supportsReliableStorage(execId))`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1349906239


##########
core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java:
##########
@@ -62,12 +62,23 @@ default void registerShuffle(int shuffleId) {}
    */
   default void removeShuffle(int shuffleId, boolean blocking) {}
 
+  /**
+   * This method is for compatibility with older implementations
+   * the implementation should just directly return true
+   * when use distributed filesystem or in a disaggregated shuffle cluster
+   */

Review Comment:
   Shall we keep the origin comments here and add these new comments as `Note: ...` at last?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350091300


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   > Uh, the default value is true. Could we make it is false in default?
   Sorry I didn't see the other way to solve compatible issue, if make it false by default, shall you illustrate more



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350110656


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   If `supportsReliableStorage(execId)` returns false in default, so `!(supportsReliableStorage() || supportsReliableStorage(execId))` compatible with before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "summaryzb (via GitHub)" <gi...@apache.org>.
summaryzb commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350167424


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2580,7 +2580,8 @@ private[spark] class DAGScheduler(
     // if the cluster manager explicitly tells us that the entire worker was lost, then
     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
     // from a Standalone cluster, where the shuffle service lives in the Worker.)
-    val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+    val fileLost = !(sc.shuffleDriverComponents.supportsReliableStorage() &&
+      sc.shuffleDriverComponents.supportsReliableStorage(execId)) &&

Review Comment:
   When use disaggregated shuffle cluster, `supportsReliableStorage()` must be true since it's used in `ExecutorAllocationManager.validatings`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45463][CORE][SHUFFLE] Support reliable store with specified executorId [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43280:
URL: https://github.com/apache/spark/pull/43280#discussion_r1350809807


##########
core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java:
##########
@@ -66,8 +66,19 @@ default void removeShuffle(int shuffleId, boolean blocking) {}
    * Does this shuffle component support reliable storage - external to the lifecycle of the
    * executor host ? For example, writing shuffle data to a distributed filesystem or
    * persisting it in a remote shuffle service.
+   *
+   * Note: This method is for compatibility with older implementations,
+   * the newer implementation should not use this method.

Review Comment:
   This comment addition is incorrect.
   A shuffle implementation can either choose to be reliable irrespective of executor id, or choose to be reliable only for specific subset of executor id's - that is an implementation choice of the implementation, and not a function of newer vs older



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org