You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/17 15:59:24 UTC

[GitHub] [spark] attilapiros commented on a change in pull request #35683: [SPARK-30873][CORE][YARN] Add support for YARN decommissioning & pre-emption

attilapiros commented on a change in pull request #35683:
URL: https://github.com/apache/spark/pull/35683#discussion_r829185089



##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {

Review comment:
       I would move this `if` into the `allocateResources` method as early as possibly (as it would improve code readability: saves an unnecessary jump).

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {

Review comment:
       rename:
   ```suggestion
     private def handleNodesInDecommissioningState(allocateResponse: AllocateResponse): Unit = {
   ```

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).

Review comment:
       Please pay attention to the indentation:
   
   ```
           allocateResponse.getUpdatedNodes.asScala.filter(_.getNodeState == NodeState.DECOMMISSIONING).
   ```

##########
File path: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
##########
@@ -706,4 +714,74 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
     }
   }
+
+  test("Test YARN container decommissioning") {
+    sparkConf.set(YARN_EXECUTOR_DECOMMISSION_ENABLED.key, "true")

Review comment:
       I would move this into the beginning of the suite and remove the cleanup.
   
   As in case of an exception the cleanup(`sparkConf.remove(YARN_EXECUTOR_DECOMMISSION_ENABLED.key)`) won't be called at all and there is no test where this flag would disturb the run.
   
   
   

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).
+          foreach(node => driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))
+      } catch {
+        case e: Exception => logError("Node decommissioning failed", e)
+      }
+    }
+  }
+
+  private def shouldDecommissionNode(nodeReport: NodeReport): Boolean = {
+    nodeReport.getNodeState match {
+      case NodeState.DECOMMISSIONING =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  private def getHostAddress(nodeReport: NodeReport): String = {
+    new URI(s"http://${nodeReport.getHttpAddress}").getHost

Review comment:
       What about `getNodeId.getHost`?
   
   https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/NodeReport.html#getNodeId--
   https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/NodeId.html#getHost--

##########
File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
##########
@@ -419,6 +422,30 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def decommissionNodes(allocateResponse: AllocateResponse): Unit = {
+    if (sparkConf.get(YARN_EXECUTOR_DECOMMISSION_ENABLED)) {
+      try {
+        allocateResponse.getUpdatedNodes.asScala.filter(shouldDecommissionNode).
+          foreach(node => driverRef.send(DecommissionExecutorsOnHost(getHostAddress(node))))
+      } catch {
+        case e: Exception => logError("Node decommissioning failed", e)
+      }
+    }
+  }
+
+  private def shouldDecommissionNode(nodeReport: NodeReport): Boolean = {
+    nodeReport.getNodeState match {
+      case NodeState.DECOMMISSIONING =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  private def getHostAddress(nodeReport: NodeReport): String = {
+    new URI(s"http://${nodeReport.getHttpAddress}").getHost

Review comment:
       and you can inline this method too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org