You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/21 19:27:44 UTC

[GitHub] asfgit closed pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource

asfgit closed pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource
URL: https://github.com/apache/spark/pull/23223
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 9497530805c1a..e158d96149622 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -612,13 +612,23 @@ private[yarn] class YarnAllocator(
             val message = "Container killed by YARN for exceeding physical memory limits. " +
               s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
             (true, message)
-          case _ =>
-            // all the failures which not covered above, like:
-            // disk failure, kill by app master or resource manager, ...
-            allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
-            (true, "Container marked as failed: " + containerId + onHostStr +
-              ". Exit status: " + completedContainer.getExitStatus +
-              ". Diagnostics: " + completedContainer.getDiagnostics)
+          case other_exit_status =>
+            // SPARK-26269: follow YARN's blacklisting behaviour(see https://github
+            // .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
+            // oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
+            // ache/hadoop/yarn/util/Apps.java#L273 for details)
+            if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
+              (false, s"Container marked as failed: $containerId$onHostStr" +
+                s". Exit status: ${completedContainer.getExitStatus}" +
+                s". Diagnostics: ${completedContainer.getDiagnostics}.")
+            } else {
+              // completed container from a bad node
+              allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
+              (true, s"Container from a bad node: $containerId$onHostStr" +
+                s". Exit status: ${completedContainer.getExitStatus}" +
+                s". Diagnostics: ${completedContainer.getDiagnostics}.")
+            }
+
 
         }
         if (exitCausedByApp) {
@@ -744,4 +754,12 @@ private object YarnAllocator {
   val MEM_REGEX = "[0-9.]+ [KMG]B"
   val VMEM_EXCEEDED_EXIT_CODE = -103
   val PMEM_EXCEEDED_EXIT_CODE = -104
+
+  val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
+    ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+    ContainerExitStatus.KILLED_BY_APPMASTER,
+    ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+    ContainerExitStatus.ABORTED,
+    ContainerExitStatus.DISKS_FAILED
+  )
 }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index ceac7cda5f8be..268976b629507 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker(
     if (removals.nonEmpty) {
       logInfo(s"removing nodes from YARN application master's blacklist: $removals")
     }
-    amClient.updateBlacklist(additions.asJava, removals.asJava)
+    if (additions.nonEmpty || removals.nonEmpty) {
+      amClient.updateBlacklist(additions.asJava, removals.asJava)
+    }
     currentBlacklistedYarnNodes = nodesToBlacklist
   }
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
index aeac68e6ed330..201910731e934 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
     // expired blacklisted nodes (simulating a resource request)
     yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
     // no change is communicated to YARN regarding the blacklisting
-    verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList())
+    verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), Collections.emptyList())
   }
 
   test("combining scheduler and allocation blacklist") {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index b61e7df4420ef..53a538dc1de29 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.Collections
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
@@ -114,13 +116,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       clock)
   }
 
-  def createContainer(host: String, resource: Resource = containerResource): Container = {
-    val containerId = ContainerId.newContainerId(appAttemptId, containerNum)
+  def createContainer(
+      host: String,
+      containerNumber: Int = containerNum,
+      resource: Resource = containerResource): Container = {
+    val  containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum)
     containerNum += 1
     val nodeId = NodeId.newInstance(host, 1000)
     Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
   }
 
+  def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = {
+    hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)}
+  }
+
+  def createContainerStatus(
+      containerId: ContainerId,
+      exitStatus: Int,
+      containerState: ContainerState = ContainerState.COMPLETE,
+      diagnostics: String = "diagnostics"): ContainerStatus = {
+    ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
+  }
+
+
   test("single container allocated") {
     // request a single container and receive it
     val handler = createAllocator(1)
@@ -148,7 +166,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
 
     handler.updateResourceRequests()
-    val container = createContainer("host1", handler.resource)
+    val container = createContainer("host1", resource = handler.resource)
     handler.handleAllocatedContainers(Array(container))
 
     // get amount of memory and vcores from resource, so effectively skipping their validation
@@ -417,4 +435,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     clock.advance(50 * 1000L)
     handler.getNumExecutorsFailed should be (0)
   }
+
+  test("SPARK-26269: YarnAllocator should have same blacklist behaviour with YARN") {
+    val rmClientSpy = spy(rmClient)
+    val maxExecutors = 11
+
+    val handler = createAllocator(
+      maxExecutors,
+      rmClientSpy,
+      Map(
+        "spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true",
+        "spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
+    handler.updateResourceRequests()
+
+    val hosts = (0 until maxExecutors).map(i => s"host$i")
+    val ids = 0 to maxExecutors
+    val containers = createContainers(hosts, ids)
+
+    val nonBlacklistedStatuses = Seq(
+      ContainerExitStatus.SUCCESS,
+      ContainerExitStatus.PREEMPTED,
+      ContainerExitStatus.KILLED_EXCEEDED_VMEM,
+      ContainerExitStatus.KILLED_EXCEEDED_PMEM,
+      ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+      ContainerExitStatus.KILLED_BY_APPMASTER,
+      ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+      ContainerExitStatus.ABORTED,
+      ContainerExitStatus.DISKS_FAILED)
+
+    val nonBlacklistedContainerStatuses = nonBlacklistedStatuses.zipWithIndex.map {
+      case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
+    }
+
+    val BLACKLISTED_EXIT_CODE = 1
+    val blacklistedStatuses = Seq(ContainerExitStatus.INVALID, BLACKLISTED_EXIT_CODE)
+
+    val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until maxExecutors).map {
+      case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
+    }
+
+    handler.handleAllocatedContainers(containers.slice(0, 9))
+    handler.processCompletedContainers(nonBlacklistedContainerStatuses)
+    verify(rmClientSpy, never())
+      .updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())
+
+    handler.handleAllocatedContainers(containers.slice(9, 11))
+    handler.processCompletedContainers(blacklistedContainerStatuses)
+    verify(rmClientSpy)
+      .updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
+    verify(rmClientSpy)
+      .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org