You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/02 11:07:28 UTC

[GitHub] [spark] SaurabhChawla100 commented on a change in pull request #27636: [SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark

SaurabhChawla100 commented on a change in pull request #27636: [SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark
URL: https://github.com/apache/spark/pull/27636#discussion_r386329460
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##########
 @@ -1542,4 +1542,50 @@ package object config {
     .bytesConf(ByteUnit.BYTE)
     .createOptional
 
+  private[spark] val GRACEFUL_DECOMMISSION_ENABLE =
+    ConfigBuilder("spark.graceful.decommission.enable")
+      .doc("Whether to enable the node graceful decommissioning handling")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD =
+    ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold")
+      .doc("Threshold of number of times fetchfailed ignored due to node" +
+        " decommission.This is configurable as per the need of the user and" +
+        " depending upon type of the cloud. If we keep this a large value and " +
+        " there is continuous decommission of nodes, in those scenarios stage" +
+        " will never abort and keeps on retrying in an unbounded manner.")
+      .intConf
+      .createWithDefault(8)
+
+  private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT =
+    ConfigBuilder("spark.graceful.decommission.executor.leasetimePct")
+      .doc("Percentage of time to expiry after which executors are killed " +
+        "(if enabled) on the node. Value ranges between (0-100)")
+      .intConf
+      .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
+      .createWithDefault(50) // Pulled out of thin air.
+
+  private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT =
+    ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct")
+      .doc("Percentage of time to expiry after which shuffle data " +
+        "cleaned up (if enabled) on the node. Value ranges between (0-100)")
+      .intConf
+      .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
+      .createWithDefault(90) // Pulled out of thin air.
+
+  private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC =
+    ConfigBuilder("spark.graceful.decommission.min.termination.time")
+      .doc("Minimum time to termination below which node decommissioning is performed immediately")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("60s")
+
+  private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT =
+    ConfigBuilder("spark.graceful.decommission.node.timeout")
 
 Review comment:
   We can get the decommission timeout for hadoop-3.1 and later version of hadoop, so we can use that value to decide when the node is decommissioned.
   Whereas for lower version of hadoop(hadoop-2.8) there is no decommissionTimeout  for decommissioning nodes in those scenario we already knew from our experience that in AWS  spotloss nodes will stay for 2 min and GCP preemptible VM will stay for 30 sec after receiving the node decommissioning from hadoop end. 
   
   This config is added here to make decommissioning of nodes to work with multiple version of hadoop, 
   
    Please find the logic used in YarnAllocator.scala to decide the timeout of the node
   
   ```
   if (x.getNodeState.toString.equals(NodeState.DECOMMISSIONING.toString)) {
             // In hadoop 2.7 there is no support getDecommissioningTimeout whereas
             // In hadoop 3.1 and later version of hadoop there is support
             // of getDecommissioningTimeout So the method call made using reflection
             // to update the value nodeTerminationTime and for lower version of hadoop2.7
             // use the config spark.graceful.decommission.node.timeout which is specific to cloud
             var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000
             try {
                 val decommiossioningTimeout = x.getClass.getMethod(
                   "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
                 if (decommiossioningTimeout != null) {
                   nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000
                 }
             } catch {
               case e: NoSuchMethodException => logDebug(e.toString)
             }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org