You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2019/03/04 20:14:30 UTC

[spark] branch master updated: [SPARK-26688][YARN] Provide configuration of initially blacklisted YARN nodes

This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new caceaec  [SPARK-26688][YARN] Provide configuration of initially blacklisted YARN nodes
caceaec is described below

commit caceaec93203edaea1d521b88e82ef67094cdea9
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Mon Mar 4 14:14:20 2019 -0600

    [SPARK-26688][YARN] Provide configuration of initially blacklisted YARN nodes
    
    ## What changes were proposed in this pull request?
    
    Introducing new config for initially blacklisted YARN nodes.
    
    ## How was this patch tested?
    
    With existing and a new unit test.
    
    Closes #23616 from attilapiros/SPARK-26688.
    
    Lead-authored-by: “attilapiros” <pi...@gmail.com>
    Co-authored-by: Attila Zsolt Piros <20...@users.noreply.github.com>
    Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
 docs/running-on-yarn.md                            |  7 +++++
 .../yarn/YarnAllocatorBlacklistTracker.scala       |  4 ++-
 .../org/apache/spark/deploy/yarn/config.scala      |  6 ++++
 .../yarn/YarnAllocatorBlacklistTrackerSuite.scala  | 33 ++++++++++++++++------
 4 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 6ee4b3d..261f7e3 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -463,6 +463,13 @@ To use a custom metrics.properties for the application master and executors, upd
   </td>
 </tr>
 <tr>
+  <td><code>spark.yarn.exclude.nodes</code></td>
+  <td>(none)</td>
+  <td>
+  Comma-separated list of YARN node names which are excluded from resource allocation.
+  </td>
+</tr>
+<tr>
   <td><code>spark.yarn.metrics.namespace</code></td>
   <td>(none)</td>
   <td>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index 268976b..fa8c961 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -56,6 +56,8 @@ private[spark] class YarnAllocatorBlacklistTracker(
 
   private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
 
+  private val excludeNodes = sparkConf.get(YARN_EXCLUDE_NODES).toSet
+
   private val allocatorBlacklist = new HashMap[String, Long]()
 
   private var currentBlacklistedYarnNodes = Set.empty[String]
@@ -105,7 +107,7 @@ private[spark] class YarnAllocatorBlacklistTracker(
 
   private def refreshBlacklistedNodes(): Unit = {
     removeExpiredYarnBlacklistedNodes()
-    val allBlacklistedNodes = schedulerBlacklist ++ allocatorBlacklist.keySet
+    val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet
     synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes)
   }
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 5f8739b..4c187b2 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -310,6 +310,12 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  /* Initially blacklisted YARN nodes. */
+  private[spark] val YARN_EXCLUDE_NODES = ConfigBuilder("spark.yarn.exclude.nodes")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
   private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = "spark.yarn.executor.resource."
   private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource."
   private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
index 259d758..c07a4ac 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -25,7 +25,7 @@ import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config.YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED
+import org.apache.spark.deploy.yarn.config.{YARN_EXCLUDE_NODES, YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED}
 import org.apache.spark.internal.config.{BLACKLIST_TIMEOUT_CONF, MAX_FAILED_EXEC_PER_NODE}
 import org.apache.spark.util.ManualClock
 
@@ -35,27 +35,31 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
   val BLACKLIST_TIMEOUT = 100L
   val MAX_FAILED_EXEC_PER_NODE_VALUE = 2
 
+  var sparkConf: SparkConf = _
   var amClientMock: AMRMClient[ContainerRequest] = _
-  var yarnBlacklistTracker: YarnAllocatorBlacklistTracker = _
-  var failureTracker: FailureTracker = _
   var clock: ManualClock = _
 
   override def beforeEach(): Unit = {
-    val sparkConf = new SparkConf()
+    sparkConf = new SparkConf()
     sparkConf.set(BLACKLIST_TIMEOUT_CONF, BLACKLIST_TIMEOUT)
     sparkConf.set(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED, true)
     sparkConf.set(MAX_FAILED_EXEC_PER_NODE, MAX_FAILED_EXEC_PER_NODE_VALUE)
     clock = new ManualClock()
-
     amClientMock = mock(classOf[AMRMClient[ContainerRequest]])
-    failureTracker = new FailureTracker(sparkConf, clock)
-    yarnBlacklistTracker =
+    super.beforeEach()
+  }
+
+  private def createYarnAllocatorBlacklistTracker(
+      sparkConf: SparkConf = sparkConf): YarnAllocatorBlacklistTracker = {
+    val failureTracker = new FailureTracker(sparkConf, clock)
+    val yarnBlacklistTracker =
       new YarnAllocatorBlacklistTracker(sparkConf, amClientMock, failureTracker)
     yarnBlacklistTracker.setNumClusterNodes(4)
-    super.beforeEach()
+    yarnBlacklistTracker
   }
 
   test("expiring its own blacklisted nodes") {
+    val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
     (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
       _ => {
         yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
@@ -77,6 +81,8 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
   }
 
   test("not handling the expiry of scheduler blacklisted nodes") {
+    val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
+
     yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
     verify(amClientMock)
       .updateBlacklist(Arrays.asList("host1", "host2"), Collections.emptyList())
@@ -91,6 +97,15 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
   }
 
   test("combining scheduler and allocation blacklist") {
+    sparkConf.set(YARN_EXCLUDE_NODES, Seq("initial1", "initial2"))
+    val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker(sparkConf)
+    yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set())
+
+    // initial1 and initial2 is added as blacklisted nodes at the very first updateBlacklist call
+    // and they are never removed
+    verify(amClientMock)
+      .updateBlacklist(Arrays.asList("initial1", "initial2"), Collections.emptyList())
+
     (1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
       _ => {
         yarnBlacklistTracker.handleResourceAllocationFailure(Some("host1"))
@@ -117,6 +132,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
   }
 
   test("blacklist all available nodes") {
+    val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
     yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2", "host3"))
     verify(amClientMock)
       .updateBlacklist(Arrays.asList("host1", "host2", "host3"), Collections.emptyList())
@@ -137,4 +153,5 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
     verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Collections.emptyList())
     assert(yarnBlacklistTracker.isAllNodeBlacklisted)
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org