You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2019/03/04 20:14:30 UTC
[spark] branch master updated: [SPARK-26688][YARN] Provide
configuration of initially blacklisted YARN nodes
This is an automated email from the ASF dual-hosted git repository.
irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new caceaec [SPARK-26688][YARN] Provide configuration of initially blacklisted YARN nodes
caceaec is described below
commit caceaec93203edaea1d521b88e82ef67094cdea9
Author: “attilapiros” <pi...@gmail.com>
AuthorDate: Mon Mar 4 14:14:20 2019 -0600
[SPARK-26688][YARN] Provide configuration of initially blacklisted YARN nodes
## What changes were proposed in this pull request?
Introducing new config for initially blacklisted YARN nodes.
## How was this patch tested?
With existing and a new unit test.
Closes #23616 from attilapiros/SPARK-26688.
Lead-authored-by: “attilapiros” <pi...@gmail.com>
Co-authored-by: Attila Zsolt Piros <20...@users.noreply.github.com>
Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
docs/running-on-yarn.md | 7 +++++
.../yarn/YarnAllocatorBlacklistTracker.scala | 4 ++-
.../org/apache/spark/deploy/yarn/config.scala | 6 ++++
.../yarn/YarnAllocatorBlacklistTrackerSuite.scala | 33 ++++++++++++++++------
4 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 6ee4b3d..261f7e3 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -463,6 +463,13 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
</tr>
<tr>
+ <td><code>spark.yarn.exclude.nodes</code></td>
+ <td>(none)</td>
+ <td>
+ Comma-separated list of YARN node names which are excluded from resource allocation.
+ </td>
+</tr>
+<tr>
<td><code>spark.yarn.metrics.namespace</code></td>
<td>(none)</td>
<td>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index 268976b..fa8c961 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -56,6 +56,8 @@ private[spark] class YarnAllocatorBlacklistTracker(
private val maxFailuresPerHost = sparkConf.get(MAX_FAILED_EXEC_PER_NODE)
+ private val excludeNodes = sparkConf.get(YARN_EXCLUDE_NODES).toSet
+
private val allocatorBlacklist = new HashMap[String, Long]()
private var currentBlacklistedYarnNodes = Set.empty[String]
@@ -105,7 +107,7 @@ private[spark] class YarnAllocatorBlacklistTracker(
private def refreshBlacklistedNodes(): Unit = {
removeExpiredYarnBlacklistedNodes()
- val allBlacklistedNodes = schedulerBlacklist ++ allocatorBlacklist.keySet
+ val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet
synchronizeBlacklistedNodeWithYarn(allBlacklistedNodes)
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 5f8739b..4c187b2 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -310,6 +310,12 @@ package object config {
.booleanConf
.createWithDefault(false)
+ /* Initially blacklisted YARN nodes. */
+ private[spark] val YARN_EXCLUDE_NODES = ConfigBuilder("spark.yarn.exclude.nodes")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = "spark.yarn.executor.resource."
private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource."
private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
index 259d758..c07a4ac 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -25,7 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config.YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED
+import org.apache.spark.deploy.yarn.config.{YARN_EXCLUDE_NODES, YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED}
import org.apache.spark.internal.config.{BLACKLIST_TIMEOUT_CONF, MAX_FAILED_EXEC_PER_NODE}
import org.apache.spark.util.ManualClock
@@ -35,27 +35,31 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
val BLACKLIST_TIMEOUT = 100L
val MAX_FAILED_EXEC_PER_NODE_VALUE = 2
+ var sparkConf: SparkConf = _
var amClientMock: AMRMClient[ContainerRequest] = _
- var yarnBlacklistTracker: YarnAllocatorBlacklistTracker = _
- var failureTracker: FailureTracker = _
var clock: ManualClock = _
override def beforeEach(): Unit = {
- val sparkConf = new SparkConf()
+ sparkConf = new SparkConf()
sparkConf.set(BLACKLIST_TIMEOUT_CONF, BLACKLIST_TIMEOUT)
sparkConf.set(YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED, true)
sparkConf.set(MAX_FAILED_EXEC_PER_NODE, MAX_FAILED_EXEC_PER_NODE_VALUE)
clock = new ManualClock()
-
amClientMock = mock(classOf[AMRMClient[ContainerRequest]])
- failureTracker = new FailureTracker(sparkConf, clock)
- yarnBlacklistTracker =
+ super.beforeEach()
+ }
+
+ private def createYarnAllocatorBlacklistTracker(
+ sparkConf: SparkConf = sparkConf): YarnAllocatorBlacklistTracker = {
+ val failureTracker = new FailureTracker(sparkConf, clock)
+ val yarnBlacklistTracker =
new YarnAllocatorBlacklistTracker(sparkConf, amClientMock, failureTracker)
yarnBlacklistTracker.setNumClusterNodes(4)
- super.beforeEach()
+ yarnBlacklistTracker
}
test("expiring its own blacklisted nodes") {
+ val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
(1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
_ => {
yarnBlacklistTracker.handleResourceAllocationFailure(Some("host"))
@@ -77,6 +81,8 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
}
test("not handling the expiry of scheduler blacklisted nodes") {
+ val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
+
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
verify(amClientMock)
.updateBlacklist(Arrays.asList("host1", "host2"), Collections.emptyList())
@@ -91,6 +97,15 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
}
test("combining scheduler and allocation blacklist") {
+ sparkConf.set(YARN_EXCLUDE_NODES, Seq("initial1", "initial2"))
+ val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker(sparkConf)
+ yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set())
+
+ // initial1 and initial2 is added as blacklisted nodes at the very first updateBlacklist call
+ // and they are never removed
+ verify(amClientMock)
+ .updateBlacklist(Arrays.asList("initial1", "initial2"), Collections.emptyList())
+
(1 to MAX_FAILED_EXEC_PER_NODE_VALUE).foreach {
_ => {
yarnBlacklistTracker.handleResourceAllocationFailure(Some("host1"))
@@ -117,6 +132,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
}
test("blacklist all available nodes") {
+ val yarnBlacklistTracker = createYarnAllocatorBlacklistTracker()
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2", "host3"))
verify(amClientMock)
.updateBlacklist(Arrays.asList("host1", "host2", "host3"), Collections.emptyList())
@@ -137,4 +153,5 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
verify(amClientMock).updateBlacklist(Arrays.asList("host4"), Collections.emptyList())
assert(yarnBlacklistTracker.isAllNodeBlacklisted)
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org