You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/28 15:59:44 UTC

[GitHub] [spark] abhishekd0907 opened a new pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

abhishekd0907 opened a new pull request #35683:
URL: https://github.com/apache/spark/pull/35683


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   This PR tries to make spark leverage the knowledge of the node loss in future, and tries to adjust the scheduling of tasks to minimise the impact on the application.
   
   This PR Adds support for YARN decommissioning & pre-emption
   
   Prevent new tasks from starting on any executors on decommissioning Nodes and offload shuffle blocks and cached RDD blocks.
   
   ### Why are the changes needed?
   Add the support to handle the Node Decommissioning for Yarn cluster manger in Spark
   
   ### Does this PR introduce any user-facing change?
   NO
   
   ### How was this patch tested?
   Added the Unit Test and run the manual test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1082010575


   > If decommissioning is enabled, shuffle blocks are already migrated to another node before the executor exits. However, this is best effort. So shuffle data might be left if shuffle blocks migration is left incomplete or the executor doesn't exit cleanly. Can you please clarify what are the expectations for shuffle data when a node is put in decommissioning state?
   
   
   I am not sure I follow.
   In k8s, where there is no shuffle service - it makes sense to move shuffle blocks when executor is getting decommissioned.
   In yarn, particularly with dynamic resource allocation (but even without it), it is very common for executors to exit - while shuffle service continues to serve the shuffle blocks generated on that node, which might have no active executors for application on them.
   You can have quite wild swings in executor allocations (ramp up to 1000s of executors in 10s of seconds, and ramp down to very low number equally fast).
   
   We should not be moving shuffle blocks when an executor is exiting - only when the node is actually getting decommissioned (they might be the same for k8s, not for yarn).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1075979415


   Not quite sure about what RM will respond to the AM's allocateResource call. Will the response include resources on nodes that are being decommissioned?IIUC, RM should not allocate new containers on those nodes after they get commissioned. Or RM will actually return to AM, that some of the nodes are getting decommissioned, where RM did allocate resources on those nodes for earlier allocateResource calls? If so, please add a comment in `handleNodesInDecommissioningState`. Thanks.
   Indeed we should also better handling the node decommission for push based shuffle, it won't trigger retry though as it will fall back. But this case also applies to ESS cases where they have the unmerged shuffle files, but it will trigger stage retry to regenerate the unmerged shuffle data in other nodes. 
   We did get bothered, during some NM updates that couldn't go with work preserving restart. We issued the decommission to a few nodes in batches, and waited until the shuffle data under tmp got cleaned up, to make sure the whole cluster NM updates didn't trigger retries. This does increase the ops overhead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1083126883


    > I am not sure I follow. In k8s, where there is no shuffle service - it makes sense to move shuffle blocks when executor is getting decommissioned. In yarn, particularly with dynamic resource allocation (but even without it), it is very common for executors to exit - while shuffle service continues to serve the shuffle blocks generated on that node, which might have no active executors for application on them. You can have quite wild swings in executor allocations (ramp up to 1000s of executors in 10s of seconds, and ramp down to very low number equally fast).
   > 
   > We should not be moving shuffle blocks when an executor is exiting - only when the node is actually getting decommissioned (they might be the same for k8s, not for yarn).
   
   @mridulm 
   I have followed the approach discussed in this [Design DOC](https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing) for this PR and it doesn't discuss any risks when marking executors which are running on Decommissionining nodes as Decommissioning or migrating shuffle blocks to executors on other nodes. Please let me know if I am missing something. Below is the discussion for YARN:
   
   > application master receives the pre-emption/decommission message first and needs to send the message to the executor. However the general principle and mechanism remains the same. The DecommissionSelf message has been added to support this. For performance, we can, when receiving the message from the application master, immediately add the associated executor(s) to the decommissioning hashmap.
   Spark's application master can receive a message from YARN indicating that one of its executors is being preempted/decommissioned or otherwise shut down. After receiving this message the application master can send the DecommissionSelf to the executor that is going to be shut down. From there the rest of the message flow will look like that of standalone/k8s. 
   
   I belive there are use cases to handle graceful decommissioning of executors if they are running on a node which has been marked as decommissioning. In many public cloud environments, the node loss (in case of AWS SpotLoss, or  GCP preemptible VMs) is a planned and informed activity. The cloud provider intimates the Yarn cluster manager about the possible loss of node in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1056049360


   cc @Ngone51 too FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1083361565


   To clarify, I can see a need for supporting node decomissioning in yarn - for both on-prem clusters and for cloud deployments - but the solution will need to take into account what the cluster manager supports.
   As I detailed above, yarn supports shuffle services and executors come and go on different nodes.
   
   Decomissioning will need to account for it, which is a different model compared to k8s support.
   
   @tgravescs might have insights into how Tez/MR handles decomissioning w.r.t shuffle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #35683:
URL: https://github.com/apache/spark/pull/35683#discussion_r829185089



##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {

Review comment:
       I would move this `if` into the `allocateResources` method as early as possibly (as it would improve code readability: saves an unnecessary jump).

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {

Review comment:
       rename:
   ```suggestion
     private def handleNodesInDecommissioningState(allocateResponse: AllocateResponse): Unit = {
   ```

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).

Review comment:
       Please pay attention to the indentation:
   
   ```
           allocateResponse.getUpdatedNodes.asScala.filter(_.getNodeState == NodeState.DECOMMISSIONING).
   ```

##########
File path: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
##########
@@ -706,4 +714,74 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
     }
   }
+
+  test("Test YARN container decommissioning") {
+    sparkConf.set(YARN_EXECUTOR_DECOMMISSION_ENABLED.key, "true")

Review comment:
       I would move this into the beginning of the suite and remove the cleanup.
   
   As in case of an exception the cleanup(`sparkConf.remove(YARN_EXECUTOR_DECOMMISSION_ENABLED.key)`) won't be called at all and there is no test where this flag would disturb the run.
   
   
   

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).
+          foreach(node => driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))
+      } catch {
+        case e: Exception => logError("Node decommissioning failed", e)
+      }
+    }
+  }
+
+  private def shouldDecommissionNode(nodeReport: NodeReport): Boolean = {
+    nodeReport.getNodeState match {
+      case NodeState.DECOMMISSIONING =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  private def getHostAddress(nodeReport: NodeReport): String = {
+    new URI(s"http://${nodeReport.getHttpAddress}").getHost

Review comment:
       What about `getNodeId.getHost`?
   
   https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/NodeReport.html#getNodeId--
   https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/NodeId.html#getHost--

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).
+          foreach(node => driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))
+      } catch {
+        case e: Exception => logError("Node decommissioning failed", e)
+      }
+    }
+  }
+
+  private def shouldDecommissionNode(nodeReport: NodeReport): Boolean = {
+    nodeReport.getNodeState match {
+      case NodeState.DECOMMISSIONING =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  private def getHostAddress(nodeReport: NodeReport): String = {
+    new URI(s"http://${nodeReport.getHttpAddress}").getHost

Review comment:
       and you can inline this method too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1070370835


   ping @attilapiros, @otterc, @zhouyejoe, @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1075882552


   @attilapiros I have handled all your comments. Can you please review again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1075882383


   > Have you got chance to test this solution on a real cluster?
   
   yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1084095300


   > To clarify, I can see a need for supporting node decomissioning in yarn - for both on-prem clusters and for cloud deployments - but the solution will need to take into account what the cluster manager supports. As I detailed above, yarn supports shuffle services and executors come and go on different nodes.
   > 
   > Decomissioning will need to account for it, which is a different model compared to k8s support.
   
   @mridulm I get your point. That's why I wanted to ask what are the expectations from this PR for changes in External Shuffle Service.  I noticed **Part 8 (Optional): When there is a node-local external shuffle service** of the design DOC talks to some extent about this and can work according to it. But Do I need to include changes pertaining to External Shuffle Service in this PR itself or mark this PR as a sub-task of the SPARK-30835 feature and pursue ShuffleService side changes in a separate PR?
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1084145891


   > Can you clarify the expectations for ESS and push-based shuffle?
   
   The push based shuffle on the decommissioned nodes will need to be covered in another ticket, such as not using the ESS on decommissioned nodes as potential mergers for later stages, and whether to update the merge status in MapOutputTracker as those nodes may potentially not be available which will later lead to merged shuffle fetch failure, and many more to make the process smooth.
   
   This PR only covers the case for Executors. I think it should be only a sub task for [SPARK-30835](https://issues.apache.org/jira/browse/SPARK-30835). And we can continue to add more sub tasks into it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1054983979


   +CC @attilapiros, @otterc, @zhouyejoe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #35683:
URL: https://github.com/apache/spark/pull/35683#discussion_r829185089



##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {

Review comment:
       I would move this `if` into the `allocateResources` method as early as possibly (as it would improve code readability: saves an unnecessary jump).

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {

Review comment:
       rename:
   ```suggestion
     private def handleNodesInDecommissioningState(allocateResponse: AllocateResponse): Unit = {
   ```

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).

Review comment:
       Please pay attention to the indentation:
   
   ```
           allocateResponse.getUpdatedNodes.asScala.filter(_.getNodeState == NodeState.DECOMMISSIONING).
   ```

##########
File path: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
##########
@@ -706,4 +714,74 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
     }
   }
+
+  test("Test YARN container decommissioning") {
+    sparkConf.set(YARN_EXECUTOR_DECOMMISSION_ENABLED.key, "true")

Review comment:
       I would move this into the beginning of the suite and remove the cleanup.
   
   As in case of an exception the cleanup(`sparkConf.remove(YARN_EXECUTOR_DECOMMISSION_ENABLED.key)`) won't be called at all and there is no test where this flag would disturb the run.
   
   
   

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).
+          foreach(node => driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))
+      } catch {
+        case e: Exception => logError("Node decommissioning failed", e)
+      }
+    }
+  }
+
+  private def shouldDecommissionNode(nodeReport: NodeReport): Boolean = {
+    nodeReport.getNodeState match {
+      case NodeState.DECOMMISSIONING =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  private def getHostAddress(nodeReport: NodeReport): String = {
+    new URI(s"http://${nodeReport.getHttpAddress}").getHost

Review comment:
       What about `getNodeId.getHost`?
   
   https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/NodeReport.html#getNodeId--
   https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/NodeId.html#getHost--

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).
+          foreach(node => driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))
+      } catch {
+        case e: Exception => logError("Node decommissioning failed", e)
+      }
+    }
+  }
+
+  private def shouldDecommissionNode(nodeReport: NodeReport): Boolean = {
+    nodeReport.getNodeState match {
+      case NodeState.DECOMMISSIONING =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  private def getHostAddress(nodeReport: NodeReport): String = {
+    new URI(s"http://${nodeReport.getHttpAddress}").getHost

Review comment:
       and you can inline this method too

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).

Review comment:
       You can simply use `==` here like:
   
   ```
           allocateResponse.getUpdatedNodes.asScala.filter(_.getNodeState == NodeState.DECOMMISSIONING).
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhouyejoe commented on a change in pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
zhouyejoe commented on a change in pull request #35683:
URL: https://github.com/apache/spark/pull/35683#discussion_r839220643



##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -422,6 +428,21 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def handleNodesInDecommissioningState(allocateResponse: AllocateResponse): Unit = {
+    try {
+      // some of the nodes are put in decommissioning state where RM did allocate

Review comment:
       Nit: Capital S




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1082010575


   > If decommissioning is enabled, shuffle blocks are already migrated to another node before the executor exits. However, this is best effort. So shuffle data might be left if shuffle blocks migration is left incomplete or the executor doesn't exit cleanly. Can you please clarify what are the expectations for shuffle data when a node is put in decommissioning state?
   
   
   I am not sure I follow.
   In k8s, where there is no shuffle service - it makes sense to move shuffle blocks when executor is getting decommissioned.
   In yarn, particularly with dynamic resource allocation (but even without it), it is very common for executors to exit - while shuffle service continues to serve the shuffle blocks generated on that node, which might have no active executors for application on them.
   
   We should not be moving shuffle blocks when an executor is exiting - only when the node is actually getting decommissioned (they might be the same for k8s, not for yarn).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1086663422


   @mridulm @tgravescs 
   Do you agree with @zhouyejoe that we can mark this PR as a subtask of Yarn Decommissioning feature and complete it's review? I can create a new subtask and separate PR for Shuffle Service side changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1084566941


   I don't remember off the top of my head how tez/MR handle that.  I don't think MR does, but TEZ likely does.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #35683:
URL: https://github.com/apache/spark/pull/35683#discussion_r832887975



##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -401,6 +401,10 @@ private[yarn] class YarnAllocator(
     val allocatedContainers = allocateResponse.getAllocatedContainers()
     allocatorNodeHealthTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
 
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {

Review comment:
       Make this a field in the class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1075968542


   +CC @otterc push based shuffle would also be impacted by node decomissioning. Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1081456688


   > Not quite sure about what RM will respond to the AM's allocateResource call. Will the response include resources on nodes that are being decommissioned?IIUC, RM should not allocate new containers on those nodes after they get commissioned. Or RM will actually return to AM, that some of the nodes are getting decommissioned, where RM did allocate resources on those nodes for earlier allocateResource calls? If so, please add a comment in `handleNodesInDecommissioningState`. Thanks. Indeed we should also better handling the node decommission for push based shuffle, it won't trigger retry though as it will fall back. But this case also applies to ESS cases where they have the unmerged shuffle files, but it will trigger stage retry to regenerate the unmerged shuffle data in other nodes. We did get bothered, during some NM updates that couldn't go with work preserving restart. We issued the decommission to a few nodes in batches, and waited until the shuffle data under tmp got cleaned 
 up, to make sure the whole cluster NM updates didn't trigger retries. This does increase the ops overhead.
   
   @zhouyejoe 
   I have added the comment, Can you clarify the expectations for ESS and push-based shuffle?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] abhishekd0907 commented on pull request #35683: [SPARK-30835][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
abhishekd0907 commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1081453295


   > What happens to nodes which are getting decomissioned, which have shuffle data on them (but no active executors) ?
   
   @mridulm 
   If decommissioning is enabled, shuffle blocks are already migrated to another node before the executor exits. However, this is best effort. So shuffle data might be left if shuffle blocks migration is left incomplete or the executor doesn't exit cleanly. Can you please clarify what are the expectations for shuffle data when a node is put in decommissioning state?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a change in pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
attilapiros commented on a change in pull request #35683:
URL: https://github.com/apache/spark/pull/35683#discussion_r829185649



##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).

Review comment:
       You can simply use `==` here like:
   
   ```
           allocateResponse.getUpdatedNodes.asScala.filter(_.getNodeState == NodeState.DECOMMISSIONING).
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35683:
URL: https://github.com/apache/spark/pull/35683#issuecomment-1054831569


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org