You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2018/12/21 19:24:11 UTC

[spark] branch master updated: [SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d6a5f85  [SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource
d6a5f85 is described below

commit d6a5f859848bbd237e19075dd26e1547fb3af417
Author: wuyi <ng...@163.com>
AuthorDate: Fri Dec 21 13:21:58 2018 -0600

    [SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource
    
    ## What changes were proposed in this pull request?
    
    As I mentioned in jira [SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to maxmize the use of cluster resource,  this pr try to make `YarnAllocator` have the same blacklist behaviour with YARN.
    
    ## How was this patch tested?
    
    Added.
    
    Closes #23223 from Ngone51/dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN.
    
    Lead-authored-by: wuyi <ng...@163.com>
    Co-authored-by: Ngone51 <ng...@163.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 32 +++++++--
 .../yarn/YarnAllocatorBlacklistTracker.scala       |  4 +-
 .../yarn/YarnAllocatorBlacklistTrackerSuite.scala  |  2 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 75 +++++++++++++++++++++-
 4 files changed, 101 insertions(+), 12 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 54b1ec2..a3feca5 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -607,13 +607,23 @@ private[yarn] class YarnAllocator(
             val message = "Container killed by YARN for exceeding physical memory limits. " +
               s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
             (true, message)
-          case _ =>
-            // all the failures which not covered above, like:
-            // disk failure, kill by app master or resource manager, ...
-            allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
-            (true, "Container marked as failed: " + containerId + onHostStr +
-              ". Exit status: " + completedContainer.getExitStatus +
-              ". Diagnostics: " + completedContainer.getDiagnostics)
+          case other_exit_status =>
+            // SPARK-26269: follow YARN's blacklisting behaviour(see https://github
+            // .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
+            // oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
+            // ache/hadoop/yarn/util/Apps.java#L273 for details)
+            if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
+              (false, s"Container marked as failed: $containerId$onHostStr" +
+                s". Exit status: ${completedContainer.getExitStatus}" +
+                s". Diagnostics: ${completedContainer.getDiagnostics}.")
+            } else {
+              // completed container from a bad node
+              allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
+              (true, s"Container from a bad node: $containerId$onHostStr" +
+                s". Exit status: ${completedContainer.getExitStatus}" +
+                s". Diagnostics: ${completedContainer.getDiagnostics}.")
+            }
+
 
         }
         if (exitCausedByApp) {
@@ -739,4 +749,12 @@ private object YarnAllocator {
   val MEM_REGEX = "[0-9.]+ [KMG]B"
   val VMEM_EXCEEDED_EXIT_CODE = -103
   val PMEM_EXCEEDED_EXIT_CODE = -104
+
+  val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
+    ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+    ContainerExitStatus.KILLED_BY_APPMASTER,
+    ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+    ContainerExitStatus.ABORTED,
+    ContainerExitStatus.DISKS_FAILED
+  )
 }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index ceac7cd..268976b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker(
     if (removals.nonEmpty) {
       logInfo(s"removing nodes from YARN application master's blacklist: $removals")
     }
-    amClient.updateBlacklist(additions.asJava, removals.asJava)
+    if (additions.nonEmpty || removals.nonEmpty) {
+      amClient.updateBlacklist(additions.asJava, removals.asJava)
+    }
     currentBlacklistedYarnNodes = nodesToBlacklist
   }
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
index aeac68e..2019107 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
     // expired blacklisted nodes (simulating a resource request)
     yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
     // no change is communicated to YARN regarding the blacklisting
-    verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList())
+    verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), Collections.emptyList())
   }
 
   test("combining scheduler and allocation blacklist") {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index b61e7df..53a538d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.Collections
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
@@ -114,13 +116,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       clock)
   }
 
-  def createContainer(host: String, resource: Resource = containerResource): Container = {
-    val containerId = ContainerId.newContainerId(appAttemptId, containerNum)
+  def createContainer(
+      host: String,
+      containerNumber: Int = containerNum,
+      resource: Resource = containerResource): Container = {
+    val  containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum)
     containerNum += 1
     val nodeId = NodeId.newInstance(host, 1000)
     Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
   }
 
+  def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = {
+    hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)}
+  }
+
+  def createContainerStatus(
+      containerId: ContainerId,
+      exitStatus: Int,
+      containerState: ContainerState = ContainerState.COMPLETE,
+      diagnostics: String = "diagnostics"): ContainerStatus = {
+    ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
+  }
+
+
   test("single container allocated") {
     // request a single container and receive it
     val handler = createAllocator(1)
@@ -148,7 +166,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
 
     handler.updateResourceRequests()
-    val container = createContainer("host1", handler.resource)
+    val container = createContainer("host1", resource = handler.resource)
     handler.handleAllocatedContainers(Array(container))
 
     // get amount of memory and vcores from resource, so effectively skipping their validation
@@ -417,4 +435,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     clock.advance(50 * 1000L)
     handler.getNumExecutorsFailed should be (0)
   }
+
+  test("SPARK-26269: YarnAllocator should have same blacklist behaviour with YARN") {
+    val rmClientSpy = spy(rmClient)
+    val maxExecutors = 11
+
+    val handler = createAllocator(
+      maxExecutors,
+      rmClientSpy,
+      Map(
+        "spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true",
+        "spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
+    handler.updateResourceRequests()
+
+    val hosts = (0 until maxExecutors).map(i => s"host$i")
+    val ids = 0 to maxExecutors
+    val containers = createContainers(hosts, ids)
+
+    val nonBlacklistedStatuses = Seq(
+      ContainerExitStatus.SUCCESS,
+      ContainerExitStatus.PREEMPTED,
+      ContainerExitStatus.KILLED_EXCEEDED_VMEM,
+      ContainerExitStatus.KILLED_EXCEEDED_PMEM,
+      ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+      ContainerExitStatus.KILLED_BY_APPMASTER,
+      ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+      ContainerExitStatus.ABORTED,
+      ContainerExitStatus.DISKS_FAILED)
+
+    val nonBlacklistedContainerStatuses = nonBlacklistedStatuses.zipWithIndex.map {
+      case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
+    }
+
+    val BLACKLISTED_EXIT_CODE = 1
+    val blacklistedStatuses = Seq(ContainerExitStatus.INVALID, BLACKLISTED_EXIT_CODE)
+
+    val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until maxExecutors).map {
+      case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
+    }
+
+    handler.handleAllocatedContainers(containers.slice(0, 9))
+    handler.processCompletedContainers(nonBlacklistedContainerStatuses)
+    verify(rmClientSpy, never())
+      .updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())
+
+    handler.handleAllocatedContainers(containers.slice(9, 11))
+    handler.processCompletedContainers(blacklistedContainerStatuses)
+    verify(rmClientSpy)
+      .updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
+    verify(rmClientSpy)
+      .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org