You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2018/12/21 19:24:11 UTC
[spark] branch master updated: [SPARK-26269][YARN] Yarnallocator
should have same blacklist behaviour with yarn to maxmize use of cluster
resource
This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d6a5f85 [SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource
d6a5f85 is described below
commit d6a5f859848bbd237e19075dd26e1547fb3af417
Author: wuyi <ng...@163.com>
AuthorDate: Fri Dec 21 13:21:58 2018 -0600
[SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource
## What changes were proposed in this pull request?
As I mentioned in jira [SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to maxmize the use of cluster resource, this pr try to make `YarnAllocator` have the same blacklist behaviour with YARN.
## How was this patch tested?
Added.
Closes #23223 from Ngone51/dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN.
Lead-authored-by: wuyi <ng...@163.com>
Co-authored-by: Ngone51 <ng...@163.com>
Signed-off-by: Thomas Graves <tg...@apache.org>
---
.../apache/spark/deploy/yarn/YarnAllocator.scala | 32 +++++++--
.../yarn/YarnAllocatorBlacklistTracker.scala | 4 +-
.../yarn/YarnAllocatorBlacklistTrackerSuite.scala | 2 +-
.../spark/deploy/yarn/YarnAllocatorSuite.scala | 75 +++++++++++++++++++++-
4 files changed, 101 insertions(+), 12 deletions(-)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 54b1ec2..a3feca5 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -607,13 +607,23 @@ private[yarn] class YarnAllocator(
val message = "Container killed by YARN for exceeding physical memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
(true, message)
- case _ =>
- // all the failures which not covered above, like:
- // disk failure, kill by app master or resource manager, ...
- allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
- (true, "Container marked as failed: " + containerId + onHostStr +
- ". Exit status: " + completedContainer.getExitStatus +
- ". Diagnostics: " + completedContainer.getDiagnostics)
+ case other_exit_status =>
+ // SPARK-26269: follow YARN's blacklisting behaviour(see https://github
+ // .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
+ // oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
+ // ache/hadoop/yarn/util/Apps.java#L273 for details)
+ if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
+ (false, s"Container marked as failed: $containerId$onHostStr" +
+ s". Exit status: ${completedContainer.getExitStatus}" +
+ s". Diagnostics: ${completedContainer.getDiagnostics}.")
+ } else {
+ // completed container from a bad node
+ allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
+ (true, s"Container from a bad node: $containerId$onHostStr" +
+ s". Exit status: ${completedContainer.getExitStatus}" +
+ s". Diagnostics: ${completedContainer.getDiagnostics}.")
+ }
+
}
if (exitCausedByApp) {
@@ -739,4 +749,12 @@ private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val VMEM_EXCEEDED_EXIT_CODE = -103
val PMEM_EXCEEDED_EXIT_CODE = -104
+
+ val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+ ContainerExitStatus.KILLED_BY_APPMASTER,
+ ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+ ContainerExitStatus.ABORTED,
+ ContainerExitStatus.DISKS_FAILED
+ )
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index ceac7cd..268976b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker(
if (removals.nonEmpty) {
logInfo(s"removing nodes from YARN application master's blacklist: $removals")
}
- amClient.updateBlacklist(additions.asJava, removals.asJava)
+ if (additions.nonEmpty || removals.nonEmpty) {
+ amClient.updateBlacklist(additions.asJava, removals.asJava)
+ }
currentBlacklistedYarnNodes = nodesToBlacklist
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
index aeac68e..2019107 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
// expired blacklisted nodes (simulating a resource request)
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
// no change is communicated to YARN regarding the blacklisting
- verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList())
+ verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), Collections.emptyList())
}
test("combining scheduler and allocation blacklist") {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index b61e7df..53a538d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.deploy.yarn
+import java.util.Collections
+
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
@@ -114,13 +116,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
clock)
}
- def createContainer(host: String, resource: Resource = containerResource): Container = {
- val containerId = ContainerId.newContainerId(appAttemptId, containerNum)
+ def createContainer(
+ host: String,
+ containerNumber: Int = containerNum,
+ resource: Resource = containerResource): Container = {
+ val containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
}
+ def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = {
+ hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)}
+ }
+
+ def createContainerStatus(
+ containerId: ContainerId,
+ exitStatus: Int,
+ containerState: ContainerState = ContainerState.COMPLETE,
+ diagnostics: String = "diagnostics"): ContainerStatus = {
+ ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
+ }
+
+
test("single container allocated") {
// request a single container and receive it
val handler = createAllocator(1)
@@ -148,7 +166,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
handler.updateResourceRequests()
- val container = createContainer("host1", handler.resource)
+ val container = createContainer("host1", resource = handler.resource)
handler.handleAllocatedContainers(Array(container))
// get amount of memory and vcores from resource, so effectively skipping their validation
@@ -417,4 +435,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be (0)
}
+
+ test("SPARK-26269: YarnAllocator should have same blacklist behaviour with YARN") {
+ val rmClientSpy = spy(rmClient)
+ val maxExecutors = 11
+
+ val handler = createAllocator(
+ maxExecutors,
+ rmClientSpy,
+ Map(
+ "spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true",
+ "spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
+ handler.updateResourceRequests()
+
+ val hosts = (0 until maxExecutors).map(i => s"host$i")
+ val ids = 0 to maxExecutors
+ val containers = createContainers(hosts, ids)
+
+ val nonBlacklistedStatuses = Seq(
+ ContainerExitStatus.SUCCESS,
+ ContainerExitStatus.PREEMPTED,
+ ContainerExitStatus.KILLED_EXCEEDED_VMEM,
+ ContainerExitStatus.KILLED_EXCEEDED_PMEM,
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+ ContainerExitStatus.KILLED_BY_APPMASTER,
+ ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+ ContainerExitStatus.ABORTED,
+ ContainerExitStatus.DISKS_FAILED)
+
+ val nonBlacklistedContainerStatuses = nonBlacklistedStatuses.zipWithIndex.map {
+ case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
+ }
+
+ val BLACKLISTED_EXIT_CODE = 1
+ val blacklistedStatuses = Seq(ContainerExitStatus.INVALID, BLACKLISTED_EXIT_CODE)
+
+ val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until maxExecutors).map {
+ case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
+ }
+
+ handler.handleAllocatedContainers(containers.slice(0, 9))
+ handler.processCompletedContainers(nonBlacklistedContainerStatuses)
+ verify(rmClientSpy, never())
+ .updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())
+
+ handler.handleAllocatedContainers(containers.slice(9, 11))
+ handler.processCompletedContainers(blacklistedContainerStatuses)
+ verify(rmClientSpy)
+ .updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
+ verify(rmClientSpy)
+ .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org