You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2019/05/14 13:41:59 UTC

[spark] branch master updated: [SPARK-27024] Executor interface for cluster managers to support GPU and other resources

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new db2e3c4  [SPARK-27024] Executor interface for cluster managers to support GPU and other resources
db2e3c4 is described below

commit db2e3c43412e4a7fb4a46c58d73d9ab304a1e949
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Tue May 14 08:41:41 2019 -0500

    [SPARK-27024] Executor interface for cluster managers to support GPU and other resources
    
    ## What changes were proposed in this pull request?
    
    Add in GPU and generic resource type allocation to the executors.
    
    Note this is part of a bigger feature for gpu-aware scheduling and is just how the executor find the resources. The general flow :
    
       - users ask for a certain set of resources, for instance number of gpus - each cluster manager has a specific way to do this.
      -  cluster manager allocates a container or set of resources (standalone mode)
    -    When spark launches the executor in that container, the executor either has to be told what resources it has or it has to auto discover them.
      -  Executor has to register with Driver and tell the driver the set of resources it has so the scheduler can use that to schedule tasks that requires a certain amount of each of those resources
    
    In this pr I added configs and arguments to the executor to be able discover resources. The argument to the executor is intended to be used by standalone mode or other cluster managers that don't have isolation so that it can assign specific resources to specific executors in case there are multiple executors on a node. The argument is a file contains JSON Array of ResourceInformation objects.
    
    The discovery script is meant to be used in an isolated environment where the executor only sees the resources it should use.
    
    Note that there will be follow on PRs to add other parts like the scheduler part. See the epic high level jira: https://issues.apache.org/jira/browse/SPARK-24615
    
    ## How was this patch tested?
    
    Added unit tests and manually tested.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #24406 from tgravescs/gpu-sched-executor-clean.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/ResourceDiscoverer.scala      |  93 ++++++++
 .../org/apache/spark/ResourceInformation.scala     |  37 +++
 .../executor/CoarseGrainedExecutorBackend.scala    | 120 +++++++++-
 .../org/apache/spark/internal/config/package.scala |   7 +
 .../cluster/CoarseGrainedClusterMessage.scala      |   4 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |   3 +-
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |   6 +-
 .../org/apache/spark/ResourceDiscovererSuite.scala | 186 +++++++++++++++
 .../deploy/StandaloneDynamicAllocationSuite.scala  |   6 +-
 .../CoarseGrainedExecutorBackendSuite.scala        | 262 +++++++++++++++++++++
 .../CoarseGrainedSchedulerBackendSuite.scala       |   6 +-
 docs/configuration.md                              |  28 +++
 .../MesosCoarseGrainedSchedulerBackendSuite.scala  |   2 +-
 .../YarnCoarseGrainedExecutorBackend.scala         |   8 +-
 14 files changed, 749 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
new file mode 100644
index 0000000..1963942
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import com.fasterxml.jackson.core.JsonParseException
+import org.json4s.{DefaultFormats, MappingException}
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils.executeAndGetOutput
+
+/**
+ * Discovers resources (GPUs/FPGAs/etc). It currently only supports resources that have
+ * addresses.
+ * This class finds resources by running and parsing the output of the user specified script
+ * from the config spark.{driver/executor}.resource.{resourceType}.discoveryScript.
+ * The output of the script it runs is expected to be JSON in the format of the
+ * ResourceInformation class.
+ *
+ * For example:  {"name": "gpu", "addresses": ["0","1"]}
+ */
+private[spark] object ResourceDiscoverer extends Logging {
+
+  private implicit val formats = DefaultFormats
+
+  def findResources(sparkConf: SparkConf, isDriver: Boolean): Map[String, ResourceInformation] = {
+    val prefix = if (isDriver) {
+      SPARK_DRIVER_RESOURCE_PREFIX
+    } else {
+      SPARK_EXECUTOR_RESOURCE_PREFIX
+    }
+    // get unique resource types by grabbing first part config with multiple periods,
+    // ie resourceType.count, grab resourceType part
+    val resourceNames = sparkConf.getAllWithPrefix(prefix).map { case (k, _) =>
+      k.split('.').head
+    }.toSet
+    resourceNames.map { rName => {
+      val rInfo = getResourceInfoForType(sparkConf, prefix, rName)
+      (rName -> rInfo)
+    }}.toMap
+  }
+
+  private def getResourceInfoForType(
+      sparkConf: SparkConf,
+      prefix: String,
+      resourceType: String): ResourceInformation = {
+    val discoveryConf = prefix + resourceType + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX
+    val script = sparkConf.getOption(discoveryConf)
+    val result = if (script.nonEmpty) {
+      val scriptFile = new File(script.get)
+      // check that script exists and try to execute
+      if (scriptFile.exists()) {
+        try {
+          val output = executeAndGetOutput(Seq(script.get), new File("."))
+          val parsedJson = parse(output)
+          val name = (parsedJson \ "name").extract[String]
+          val addresses = (parsedJson \ "addresses").extract[Array[String]].toArray
+          new ResourceInformation(name, addresses)
+        } catch {
+          case e @ (_: SparkException | _: MappingException | _: JsonParseException) =>
+            throw new SparkException(s"Error running the resource discovery script: $scriptFile" +
+              s" for $resourceType", e)
+        }
+      } else {
+        throw new SparkException(s"Resource script: $scriptFile to discover $resourceType" +
+          s" doesn't exist!")
+      }
+    } else {
+      throw new SparkException(s"User is expecting to use $resourceType resources but " +
+        s"didn't specify a script via conf: $discoveryConf, to find them!")
+    }
+    result
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/ResourceInformation.scala
new file mode 100644
index 0000000..6a5b725
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ResourceInformation.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.Evolving
+
+/**
+ * Class to hold information about a type of Resource. A resource could be a GPU, FPGA, etc.
+ * The array of addresses are resource specific and its up to the user to interpret the address.
+ *
+ * One example is GPUs, where the addresses would be the indices of the GPUs
+ *
+ * @param name the name of the resource
+ * @param addresses an array of strings describing the addresses of the resource
+ */
+@Evolving
+class ResourceInformation(
+    val name: String,
+    val addresses: Array[String]) extends Serializable {
+
+  override def toString: String = s"[name: ${name}, addresses: ${addresses.mkString(",")}]"
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index f57b731..af01e0b 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.executor
 
+import java.io.{BufferedInputStream, FileInputStream}
 import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
@@ -26,12 +27,18 @@ import scala.collection.mutable
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
+import com.fasterxml.jackson.databind.exc.MismatchedInputException
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.JArray
+import org.json4s.MappingException
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.EXECUTOR_ID
+import org.apache.spark.internal.config._
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -45,9 +52,12 @@ private[spark] class CoarseGrainedExecutorBackend(
     hostname: String,
     cores: Int,
     userClassPath: Seq[URL],
-    env: SparkEnv)
+    env: SparkEnv,
+    resourcesFile: Option[String])
   extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
 
+  private implicit val formats = DefaultFormats
+
   private[this] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
@@ -58,11 +68,12 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   override def onStart() {
     logInfo("Connecting to driver: " + driverUrl)
+    val resources = parseOrFindResources(resourcesFile)
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
       ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
-        extractAttributes))
+        extractAttributes, resources))
     }(ThreadUtils.sameThread).onComplete {
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) =>
@@ -72,6 +83,97 @@ private[spark] class CoarseGrainedExecutorBackend(
     }(ThreadUtils.sameThread)
   }
 
+  // Check that the actual resources discovered will satisfy the user specified
+  // requirements and that they match the configs specified by the user to catch
+  // mismatches between what the user requested and what resource manager gave or
+  // what the discovery script found.
+  private def checkResourcesMeetRequirements(
+      resourceConfigPrefix: String,
+      reqResourcesAndCounts: Array[(String, String)],
+      actualResources: Map[String, ResourceInformation]): Unit = {
+
+    reqResourcesAndCounts.foreach { case (rName, reqCount) =>
+      if (actualResources.contains(rName)) {
+        val resourceInfo = actualResources(rName)
+
+        if (resourceInfo.addresses.size < reqCount.toLong) {
+          throw new SparkException(s"Resource: $rName with addresses: " +
+            s"${resourceInfo.addresses.mkString(",")} doesn't meet the " +
+            s"requirements of needing $reqCount of them")
+        }
+        // also make sure the resource count on start matches the
+        // resource configs specified by user
+        val userCountConfigName =
+          resourceConfigPrefix + rName + SPARK_RESOURCE_COUNT_POSTFIX
+        val userConfigCount = env.conf.getOption(userCountConfigName).
+          getOrElse(throw new SparkException(s"Resource: $rName not specified " +
+            s"via config: $userCountConfigName, but required, " +
+            "please fix your configuration"))
+
+        if (userConfigCount.toLong > resourceInfo.addresses.size) {
+          throw new SparkException(s"Resource: $rName, with addresses: " +
+            s"${resourceInfo.addresses.mkString(",")} " +
+            s"is less than what the user requested for count: $userConfigCount, " +
+            s"via $userCountConfigName")
+        }
+      } else {
+        throw new SparkException(s"Executor resource config missing required task resource: $rName")
+      }
+    }
+  }
+
+  // visible for testing
+  def parseOrFindResources(resourcesFile: Option[String]): Map[String, ResourceInformation] = {
+    // only parse the resources if a task requires them
+    val taskResourceConfigs = env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
+    val resourceInfo = if (taskResourceConfigs.nonEmpty) {
+      val execResources = resourcesFile.map { resourceFileStr => {
+        val source = new BufferedInputStream(new FileInputStream(resourceFileStr))
+        val resourceMap = try {
+          val parsedJson = parse(source).asInstanceOf[JArray].arr
+          parsedJson.map { json =>
+            val name = (json \ "name").extract[String]
+            val addresses = (json \ "addresses").extract[Array[String]]
+            new ResourceInformation(name, addresses)
+          }.map(x => (x.name -> x)).toMap
+        } catch {
+          case e @ (_: MappingException | _: MismatchedInputException) =>
+            throw new SparkException(
+              s"Exception parsing the resources in $resourceFileStr", e)
+        } finally {
+          source.close()
+        }
+        resourceMap
+      }}.getOrElse(ResourceDiscoverer.findResources(env.conf, isDriver = false))
+
+      if (execResources.isEmpty) {
+        throw new SparkException("User specified resources per task via: " +
+          s"$SPARK_TASK_RESOURCE_PREFIX, but can't find any resources available on the executor.")
+      }
+      // get just the map of resource name to count
+      val resourcesAndCounts = taskResourceConfigs.
+        withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
+        map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)}
+
+      checkResourcesMeetRequirements(SPARK_EXECUTOR_RESOURCE_PREFIX, resourcesAndCounts,
+        execResources)
+
+      logInfo("===============================================================================")
+      logInfo(s"Executor $executorId Resources:")
+      execResources.foreach { case (k, v) => logInfo(s"$k -> $v") }
+      logInfo("===============================================================================")
+
+      execResources
+    } else {
+      if (resourcesFile.nonEmpty) {
+        logWarning(s"A resources file was specified but the application is not configured " +
+          s"to use any resources, see the configs with prefix: ${SPARK_TASK_RESOURCE_PREFIX}")
+      }
+      Map.empty[String, ResourceInformation]
+    }
+    resourceInfo
+  }
+
   def extractLogUrls: Map[String, String] = {
     val prefix = "SPARK_LOG_URL_"
     sys.env.filterKeys(_.startsWith(prefix))
@@ -189,13 +291,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       cores: Int,
       appId: String,
       workerUrl: Option[String],
-      userClassPath: mutable.ListBuffer[URL])
+      userClassPath: mutable.ListBuffer[URL],
+      resourcesFile: Option[String])
 
   def main(args: Array[String]): Unit = {
     val createFn: (RpcEnv, Arguments, SparkEnv) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
       new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
-        arguments.hostname, arguments.cores, arguments.userClassPath, env)
+        arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile)
     }
     run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
     System.exit(0)
@@ -257,6 +360,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     var executorId: String = null
     var hostname: String = null
     var cores: Int = 0
+    var resourcesFile: Option[String] = None
     var appId: String = null
     var workerUrl: Option[String] = None
     val userClassPath = new mutable.ListBuffer[URL]()
@@ -276,6 +380,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         case ("--cores") :: value :: tail =>
           cores = value.toInt
           argv = tail
+        case ("--resourcesFile") :: value :: tail =>
+          resourcesFile = Some(value)
+          argv = tail
         case ("--app-id") :: value :: tail =>
           appId = value
           argv = tail
@@ -301,7 +408,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     }
 
     Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
-      userClassPath)
+      userClassPath, resourcesFile)
   }
 
   private def printUsageAndExit(classNameForEntry: String): Unit = {
@@ -315,6 +422,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       |   --executor-id <executorId>
       |   --hostname <hostname>
       |   --cores <cores>
+      |   --resourcesFile <fileWithJSONResourceInformation>
       |   --app-id <appid>
       |   --worker-url <workerUrl>
       |   --user-class-path <url>
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 8e59ce7..0aed1af 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -30,6 +30,13 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_
 
 package object config {
 
+  private[spark] val SPARK_DRIVER_RESOURCE_PREFIX = "spark.driver.resource."
+  private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = "spark.executor.resource."
+  private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."
+
+  private[spark] val SPARK_RESOURCE_COUNT_POSTFIX = ".count"
+  private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX = ".discoveryScript"
+
   private[spark] val DRIVER_CLASS_PATH =
     ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index afb48a31..89425e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import java.nio.ByteBuffer
 
+import org.apache.spark.ResourceInformation
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.ExecutorLossReason
@@ -64,7 +65,8 @@ private[spark] object CoarseGrainedClusterMessages {
       hostname: String,
       cores: Int,
       logUrls: Map[String, String],
-      attributes: Map[String, String])
+      attributes: Map[String, String],
+      resources: Map[String, ResourceInformation])
     extends CoarseGrainedClusterMessage
 
   case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4830d0e..f7cf212 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -185,7 +185,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 
-      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) =>
+      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
+          attributes, resources) =>
         if (executorDataMap.contains(executorId)) {
           executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
           context.reply(true)
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 061aeb3..dc4f4b4 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -174,9 +174,11 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
     fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty))
+      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty,
+        Map.empty))
     fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty))
+      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty,
+        Map.empty))
     heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)
diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
new file mode 100644
index 0000000..77f8a0b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files => JavaFiles}
+import java.nio.file.attribute.PosixFilePermission._
+import java.util.EnumSet
+
+import com.google.common.io.Files
+
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+class ResourceDiscovererSuite extends SparkFunSuite
+    with LocalSparkContext {
+
+  def mockDiscoveryScript(file: File, result: String): String = {
+    Files.write(s"echo $result", file, StandardCharsets.UTF_8)
+    JavaFiles.setPosixFilePermissions(file.toPath(),
+      EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
+    file.getPath()
+  }
+
+  test("Resource discoverer no resources") {
+    val sparkconf = new SparkConf
+    val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+    assert(resources.size === 0)
+    assert(resources.get("gpu").isEmpty,
+      "Should have a gpus entry that is empty")
+  }
+
+  test("Resource discoverer multiple gpus") {
+    val sparkconf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val scriptPath = mockDiscoveryScript(gpuFile,
+        """'{"name": "gpu","addresses":["0", "1"]}'""")
+      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath)
+      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+      val gpuValue = resources.get("gpu")
+      assert(gpuValue.nonEmpty, "Should have a gpu entry")
+      assert(gpuValue.get.name == "gpu", "name should be gpu")
+      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries")
+    }
+  }
+
+  test("Resource discoverer no addresses errors") {
+    val sparkconf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val scriptPath = mockDiscoveryScript(gpuFile,
+        """'{"name": "gpu"}'""")
+      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath)
+      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+      val gpuValue = resources.get("gpu")
+      assert(gpuValue.nonEmpty, "Should have a gpu entry")
+      assert(gpuValue.get.name == "gpu", "name should be gpu")
+      assert(gpuValue.get.addresses.size == 0, "Should have 0 indexes")
+    }
+  }
+
+  test("Resource discoverer multiple resource types") {
+    val sparkconf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val gpuDiscovery = mockDiscoveryScript(gpuFile,
+        """'{"name": "gpu", "addresses": ["0", "1"]}'""")
+      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
+
+      val fpgaFile = new File(dir, "fpgaDiscoverScript")
+      val fpgaDiscovery = mockDiscoveryScript(fpgaFile,
+        """'{"name": "fpga", "addresses": ["f1", "f2", "f3"]}'""")
+      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery)
+
+      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+      assert(resources.size === 2)
+      val gpuValue = resources.get("gpu")
+      assert(gpuValue.nonEmpty, "Should have a gpu entry")
+      assert(gpuValue.get.name == "gpu", "name should be gpu")
+      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries")
+
+      val fpgaValue = resources.get("fpga")
+      assert(fpgaValue.nonEmpty, "Should have a gpu entry")
+      assert(fpgaValue.get.name == "fpga", "name should be fpga")
+      assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes")
+      assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep,
+        "should have f1,f2,f3 entries")
+    }
+  }
+
+  test("Resource discoverer multiple gpus on driver") {
+    val sparkconf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val gpuDiscovery = mockDiscoveryScript(gpuFile,
+        """'{"name": "gpu", "addresses": ["0", "1"]}'""")
+      sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
+      sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, "boguspath")
+      // make sure it reads from correct config, here it should use driver
+      val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = true)
+      val gpuValue = resources.get("gpu")
+      assert(gpuValue.nonEmpty, "Should have a gpu entry")
+      assert(gpuValue.get.name == "gpu", "name should be gpu")
+      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries")
+    }
+  }
+
+  test("Resource discoverer script returns invalid format") {
+    val sparkconf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuFile = new File(dir, "gpuDiscoverScript")
+      val gpuDiscovery = mockDiscoveryScript(gpuFile,
+        """'{"addresses": ["0", "1"]}'""")
+      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
+      val error = intercept[SparkException] {
+        ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+      }.getMessage()
+
+      assert(error.contains("Error running the resource discovery"))
+    }
+  }
+
+  test("Resource discoverer script doesn't exist") {
+    val sparkconf = new SparkConf
+    withTempDir { dir =>
+      val file1 = new File(dir, "bogusfilepath")
+      try {
+        sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+          SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath())
+
+        val error = intercept[SparkException] {
+          ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+        }.getMessage()
+
+        assert(error.contains("doesn't exist"))
+      } finally {
+        JavaFiles.deleteIfExists(file1.toPath())
+      }
+    }
+  }
+
+  test("gpu's specified but not discovery script") {
+    val sparkconf = new SparkConf
+    sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
+      SPARK_RESOURCE_COUNT_POSTFIX, "2")
+
+    val error = intercept[SparkException] {
+      ResourceDiscoverer.findResources(sparkconf, isDriver = false)
+    }.getMessage()
+
+    assert(error.contains("User is expecting to use"))
+  }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 1ed2a1a..39daf51 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -497,7 +497,8 @@ class StandaloneDynamicAllocationSuite
     val endpointRef = mock(classOf[RpcEndpointRef])
     val mockAddress = mock(classOf[RpcAddress])
     when(endpointRef.address).thenReturn(mockAddress)
-    val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty)
+    val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty,
+      Map.empty)
 
     // Get "localhost" on a blacklist.
     val taskScheduler = mock(classOf[TaskSchedulerImpl])
@@ -621,7 +622,8 @@ class StandaloneDynamicAllocationSuite
       val endpointRef = mock(classOf[RpcEndpointRef])
       val mockAddress = mock(classOf[RpcAddress])
       when(endpointRef.address).thenReturn(mockAddress)
-      val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty)
+      val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
+        Map.empty)
       val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
       backend.driverEndpoint.askSync[Boolean](message)
     }
diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
new file mode 100644
index 0000000..e59d1b0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+
+import java.io.{File, PrintWriter}
+import java.net.URL
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files => JavaFiles}
+import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE}
+import java.util.EnumSet
+
+import com.google.common.io.Files
+import org.json4s.JsonAST.{JArray, JObject, JString}
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods.{compact, render}
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.Utils
+
+class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
+    with LocalSparkContext with MockitoSugar {
+
+  private def writeFileWithJson(dir: File, strToWrite: JArray): String = {
+    val f1 = File.createTempFile("test-resource-parser1", "", dir)
+    JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes())
+    f1.getPath()
+  }
+
+  test("parsing no resources") {
+    val conf = new SparkConf
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+
+    // we don't really use this, just need it to get at the parser function
+    val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
+      4, Seq.empty[URL], env, None)
+    withTempDir { tmpDir =>
+      val testResourceArgs: JObject = ("" -> "")
+      val ja = JArray(List(testResourceArgs))
+      val f1 = writeFileWithJson(tmpDir, ja)
+      var error = intercept[SparkException] {
+        val parsedResources = backend.parseOrFindResources(Some(f1))
+      }.getMessage()
+
+      assert(error.contains("Exception parsing the resources in"),
+        s"Calling with no resources didn't error as expected, error: $error")
+    }
+  }
+
+  test("parsing one resources") {
+    val conf = new SparkConf
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    // we don't really use this, just need it to get at the parser function
+    val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
+      4, Seq.empty[URL], env, None)
+    withTempDir { tmpDir =>
+      val testResourceArgs =
+        ("name" -> "gpu") ~
+        ("addresses" -> Seq("0", "1"))
+      val ja = JArray(List(testResourceArgs))
+      val f1 = writeFileWithJson(tmpDir, ja)
+      val parsedResources = backend.parseOrFindResources(Some(f1))
+
+      assert(parsedResources.size === 1)
+      assert(parsedResources.get("gpu").nonEmpty)
+      assert(parsedResources.get("gpu").get.name === "gpu")
+      assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep)
+    }
+  }
+
+  test("parsing multiple resources") {
+    val conf = new SparkConf
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    // we don't really use this, just need it to get at the parser function
+    val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
+      4, Seq.empty[URL], env, None)
+
+    withTempDir { tmpDir =>
+      val gpuArgs =
+        ("name" -> "gpu") ~
+          ("addresses" -> Seq("0", "1"))
+      val fpgaArgs =
+        ("name" -> "fpga") ~
+          ("addresses" -> Seq("f1", "f2", "f3"))
+      val ja = JArray(List(gpuArgs, fpgaArgs))
+      val f1 = writeFileWithJson(tmpDir, ja)
+      val parsedResources = backend.parseOrFindResources(Some(f1))
+
+      assert(parsedResources.size === 2)
+      assert(parsedResources.get("gpu").nonEmpty)
+      assert(parsedResources.get("gpu").get.name === "gpu")
+      assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep)
+      assert(parsedResources.get("fpga").nonEmpty)
+      assert(parsedResources.get("fpga").get.name === "fpga")
+      assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep)
+    }
+  }
+
+  test("error checking parsing resources and executor and task configs") {
+    val conf = new SparkConf
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    // we don't really use this, just need it to get at the parser function
+    val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
+      4, Seq.empty[URL], env, None)
+
+    // not enough gpu's on the executor
+    withTempDir { tmpDir =>
+      val gpuArgs =
+        ("name" -> "gpu") ~
+          ("addresses" -> Seq("0"))
+      val ja = JArray(List(gpuArgs))
+      val f1 = writeFileWithJson(tmpDir, ja)
+
+      var error = intercept[SparkException] {
+        val parsedResources = backend.parseOrFindResources(Some(f1))
+      }.getMessage()
+
+      assert(error.contains("doesn't meet the requirements of needing"))
+    }
+
+    // missing resource on the executor
+    withTempDir { tmpDir =>
+      val gpuArgs =
+        ("name" -> "fpga") ~
+          ("addresses" -> Seq("0"))
+      val ja = JArray(List(gpuArgs))
+      val f1 = writeFileWithJson(tmpDir, ja)
+
+      var error = intercept[SparkException] {
+        val parsedResources = backend.parseOrFindResources(Some(f1))
+      }.getMessage()
+
+      assert(error.contains("Executor resource config missing required task resource"))
+    }
+  }
+
+  test("executor resource found less than required") {
+    val conf = new SparkConf
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "4")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "1")
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    // we don't really use this, just need it to get at the parser function
+    val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
+      4, Seq.empty[URL], env, None)
+
+    // executor resources < required
+    withTempDir { tmpDir =>
+      val gpuArgs =
+        ("name" -> "gpu") ~
+          ("addresses" -> Seq("0", "1"))
+      val ja = JArray(List(gpuArgs))
+      val f1 = writeFileWithJson(tmpDir, ja)
+
+      var error = intercept[SparkException] {
+        val parsedResources = backend.parseOrFindResources(Some(f1))
+      }.getMessage()
+
+      assert(error.contains("is less than what the user requested for count"))
+    }
+  }
+
+  test("parsing resources task configs with missing executor config") {
+    val conf = new SparkConf
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    // we don't really use this, just need it to get at the parser function
+    val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
+      4, Seq.empty[URL], env, None)
+
+    withTempDir { tmpDir =>
+      val gpuArgs =
+        ("name" -> "gpu") ~
+          ("addresses" -> Seq("0", "1"))
+      val ja = JArray(List(gpuArgs))
+      val f1 = writeFileWithJson(tmpDir, ja)
+
+      var error = intercept[SparkException] {
+        val parsedResources = backend.parseOrFindResources(Some(f1))
+      }.getMessage()
+
+      assert(error.contains("Resource: gpu not specified via config: " +
+        "spark.executor.resource.gpu.count, but required, please " +
+        "fix your configuration"))
+    }
+  }
+
+  test("use discoverer") {
+    val conf = new SparkConf
+    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
+    conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga")
+      Files.write("""echo '{"name": "fpga","addresses":["f1", "f2", "f3"]}'""",
+        fpgaDiscovery, StandardCharsets.UTF_8)
+      JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(),
+        EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
+      conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
+        SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery.getPath())
+
+      val serializer = new JavaSerializer(conf)
+      val env = createMockEnv(conf, serializer)
+
+      // we don't really use this, just need it to get at the parser function
+      val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
+        4, Seq.empty[URL], env, None)
+
+      val parsedResources = backend.parseOrFindResources(None)
+
+      assert(parsedResources.size === 1)
+      assert(parsedResources.get("fpga").nonEmpty)
+      assert(parsedResources.get("fpga").get.name === "fpga")
+      assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep)
+    }
+  }
+
+  private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
+    val mockEnv = mock[SparkEnv]
+    val mockRpcEnv = mock[RpcEnv]
+    when(mockEnv.conf).thenReturn(conf)
+    when(mockEnv.serializer).thenReturn(serializer)
+    when(mockEnv.closureSerializer).thenReturn(serializer)
+    when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
+    SparkEnv.set(mockEnv)
+    mockEnv
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 33f48b8..4858d38 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -164,11 +164,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     sc.addSparkListener(listener)
 
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
+      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
+      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
     backend.driverEndpoint.askSync[Boolean](
-      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
+      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
 
     sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
     assert(executorAddedCount === 3)
diff --git a/docs/configuration.md b/docs/configuration.md
index da2d279..d0b2699 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -222,6 +222,25 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
+ <td><code>spark.executor.resource.{resourceType}.count</code></td>
+  <td>0</td>
+  <td>
+    The number of a particular resource type to use per executor process.
+    If this is used, you must also specify the
+    <code>spark.executor.resource.{resourceType}.discoveryScript</code>
+    for the executor to find the resource on startup.
+  </td>
+</tr>
+<tr>
+ <td><code>spark.executor.resource.{resourceType}.discoveryScript</code></td>
+  <td>None</td>
+  <td>
+    A script for the executor to run to discover a particular resource type. This should
+    write to STDOUT a JSON string in the format of the ResourceInformation class. This has a
+    name and an array of addresses.
+  </td>
+</tr>
+<tr>
   <td><code>spark.extraListeners</code></td>
   <td>(none)</td>
   <td>
@@ -1794,6 +1813,15 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.task.resource.{resourceType}.count</code></td>
+  <td>1</td>
+  <td>
+    Number of a particular resource type to allocate for each task. If this is specified
+    you must also provide the executor config <code>spark.executor.resource.{resourceType}.count</code>
+    and any corresponding discovery configs so that your executors are created with that resource type.
+  </td>
+</tr>
+<tr>
   <td><code>spark.task.maxFailures</code></td>
   <td>4</td>
   <td>
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 37c0f5f..34b5f6c 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -692,7 +692,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val mockEndpointRef = mock[RpcEndpointRef]
     val mockAddress = mock[RpcAddress]
     val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty,
-      Map.empty)
+      Map.empty, Map.empty)
 
     backend.driverEndpoint.askSync[Boolean](message)
   }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
index 53e99d9..380da36 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
@@ -37,7 +37,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     hostname: String,
     cores: Int,
     userClassPath: Seq[URL],
-    env: SparkEnv)
+    env: SparkEnv,
+    resourcesFile: Option[String])
   extends CoarseGrainedExecutorBackend(
     rpcEnv,
     driverUrl,
@@ -45,7 +46,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     hostname,
     cores,
     userClassPath,
-    env) with Logging {
+    env,
+    resourcesFile) with Logging {
 
   private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
 
@@ -66,7 +68,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
     val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
       new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
-        arguments.hostname, arguments.cores, arguments.userClassPath, env)
+        arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile)
     }
     val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
       this.getClass.getCanonicalName.stripSuffix("$"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org