You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2019/11/25 15:37:05 UTC

[spark] branch master updated: [SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d5de25  [SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes
2d5de25 is described below

commit 2d5de25a999e0e5580cf4024937b61e6c9265672
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Mon Nov 25 09:36:39 2019 -0600

    [SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes
    
    ### What changes were proposed in this pull request?
    
    This PR is adding the base classes needed for Stage level scheduling. Its adding a ResourceProfile and the executor and task resource request classes.  These are made private for now until we get all the parts implemented, at which point this will become public interfaces.  I am adding them first as all the other subtasks for this feature require these classes.  If people have better ideas on breaking this feature up please let me know.
    
    See https://issues.apache.org/jira/browse/SPARK-29415 for more detailed design.
    
    ### Why are the changes needed?
    
    New API for stage level scheduling.  Its easier to add these first because the other jira for this features will all use them.
    
    ### Does this PR introduce any user-facing change?
    
    Yes adds API to create a ResourceProfile with executor/task resources, see the spip jira https://issues.apache.org/jira/browse/SPARK-27495
    
    Example of the api:
    val rp = new ResourceProfile()
    rp.require(new ExecutorResourceRequest("cores", 2))
    rp.require(new ExecutorResourceRequest("gpu", 1, Some("/opt/gpuScripts/getGpus")))
    rp.require(new TaskResourceRequest("gpu", 1))
    
    ### How was this patch tested?
    
    Tested using Unit tests added with this PR.
    
    Closes #26284 from tgravescs/SPARK-29415.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../spark/resource/ExecutorResourceRequest.scala   |  77 ++++++++++
 .../spark/resource/ExecutorResourceRequests.scala  | 122 +++++++++++++++
 .../apache/spark/resource/ResourceProfile.scala    | 147 ++++++++++++++++++
 .../org/apache/spark/resource/ResourceUtils.scala  |   7 +-
 .../spark/resource/TaskResourceRequest.scala       |  43 ++++++
 .../spark/resource/TaskResourceRequests.scala      |  75 ++++++++++
 .../spark/resource/JavaResourceProfileSuite.java   |  66 ++++++++
 .../spark/resource/ResourceProfileSuite.scala      | 166 +++++++++++++++++++++
 8 files changed, 701 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
new file mode 100644
index 0000000..88ceaad
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
+
+/**
+ * An Executor resource request. This is used in conjunction with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * This is used to specify what the resource requirements are for an Executor and how
+ * Spark can find out specific details about those resources. Not all the parameters are
+ * required for every resource type. The resources names supported
+ * correspond to the regular Spark configs with the prefix removed. For instance overhead
+ * memory in this api is memoryOverhead, which is spark.executor.memoryOverhead with
+ * spark.executor removed. Resources like GPUs are resource.gpu
+ * (spark configs spark.executor.resource.gpu.*). The amount, discoveryScript, and vendor
+ * parameters for resources are all the same parameters a user would specify through the
+ * configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}.
+ *
+ * For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has
+ * to specify the resource name (resource.gpu), the amount or number of GPUs per Executor,
+ * the discovery script would be specified so that when the Executor starts up it can
+ * discovery what GPU addresses are available for it to use because YARN doesn't tell
+ * Spark that, then vendor would not be used because its specific for Kubernetes.
+ *
+ * See the configuration and cluster specific docs for more details.
+ *
+ * Use ExecutorResourceRequests class as a convenience API.
+ *
+ * @param resourceName Name of the resource
+ * @param amount Amount requesting
+ * @param discoveryScript Optional script used to discover the resources. This is required on some
+ *                        cluster managers that don't tell Spark the addresses of the resources
+ *                        allocated. The script runs on Executors startup to discover the addresses
+ *                        of the resources available.
+ * @param vendor Optional vendor, required for some cluster managers
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class ExecutorResourceRequest(
+    val resourceName: String,
+    val amount: Long,
+    val discoveryScript: String = "",
+    val vendor: String = "") extends Serializable {
+
+  // A list of allowed Spark internal resources. Custom resources (spark.executor.resource.*)
+  // like GPUs/FPGAs are also allowed, see the check below.
+  private val allowedExecutorResources = mutable.HashSet[String](
+    ResourceProfile.MEMORY,
+    ResourceProfile.OVERHEAD_MEM,
+    ResourceProfile.PYSPARK_MEM,
+    ResourceProfile.CORES)
+
+  if (!allowedExecutorResources.contains(resourceName) && !resourceName.startsWith(RESOURCE_DOT)) {
+    throw new IllegalArgumentException(s"Executor resource not allowed: $resourceName")
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
new file mode 100644
index 0000000..6ffcc0c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.resource.ResourceProfile._
+
+/**
+ * A set of Executor resource requests. This is used in conjunction with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class ExecutorResourceRequests() extends Serializable {
+
+  private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
+
+  def requests: Map[String, ExecutorResourceRequest] = _executorResources.toMap
+
+  /**
+   * Specify heap memory. The value specified will be converted to MiB.
+   *
+   * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+   *               Default unit is MiB if not specified.
+   */
+  def memory(amount: String): this.type = {
+    val amountMiB = JavaUtils.byteStringAsMb(amount)
+    val rr = new ExecutorResourceRequest(MEMORY, amountMiB)
+    _executorResources(MEMORY) = rr
+    this
+  }
+
+  /**
+   * Specify overhead memory. The value specified will be converted to MiB.
+   *
+   * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+   *               Default unit is MiB if not specified.
+   */
+  def memoryOverhead(amount: String): this.type = {
+    val amountMiB = JavaUtils.byteStringAsMb(amount)
+    val rr = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB)
+    _executorResources(OVERHEAD_MEM) = rr
+    this
+  }
+
+  /**
+   * Specify pyspark memory. The value specified will be converted to MiB.
+   *
+   * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+   *               Default unit is MiB if not specified.
+   */
+  def pysparkMemory(amount: String): this.type = {
+    val amountMiB = JavaUtils.byteStringAsMb(amount)
+    val rr = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB)
+    _executorResources(PYSPARK_MEM) = rr
+    this
+  }
+
+  /**
+   * Specify number of cores per Executor.
+   *
+   * @param amount Number of cores to allocate per Executor.
+   */
+  def cores(amount: Int): this.type = {
+    val t = new ExecutorResourceRequest(CORES, amount)
+    _executorResources(CORES) = t
+    this
+  }
+
+  /**
+   *  Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
+   *  correspond to the regular Spark configs with the prefix removed. For instance, resources
+   *  like GPUs are resource.gpu (spark configs spark.executor.resource.gpu.*)
+   *
+   * @param resourceName Name of the resource.
+   * @param amount amount of that resource per executor to use.
+   * @param discoveryScript Optional script used to discover the resources. This is required on
+   *                        some cluster managers that don't tell Spark the addresses of
+   *                        the resources allocated. The script runs on Executors startup to
+   *                        of the resources available.
+   * @param vendor Optional vendor, required for some cluster managers
+   */
+  def resource(
+      resourceName: String,
+      amount: Long,
+      discoveryScript: String = "",
+      vendor: String = ""): this.type = {
+    // a bit weird but for Java api use empty string as meaning None because empty
+    // string is otherwise invalid for those paramters anyway
+    val eReq = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
+    _executorResources(resourceName) = eReq
+    this
+  }
+
+  def addRequest(ereq: ExecutorResourceRequest): this.type = {
+    _executorResources(ereq.resourceName) = ereq
+    this
+  }
+
+  override def toString: String = {
+    s"Executor resource requests: ${_executorResources}"
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
new file mode 100644
index 0000000..876a655
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.util.{Map => JMap}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX
+
+/**
+ * Resource profile to associate with an RDD. A ResourceProfile allows the user to
+ * specify executor and task requirements for an RDD that will get applied during a
+ * stage. This allows the user to change the resource requirements between stages.
+ *
+ * This class is private now for initial development, once we have the feature in place
+ * this will become public.
+ */
+@Evolving
+private[spark] class ResourceProfile() extends Serializable {
+
+  private val _id = ResourceProfile.getNextProfileId
+  private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
+  private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
+
+  def id: Int = _id
+  def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap
+  def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.toMap
+
+  /**
+   * (Java-specific) gets a Java Map of resources to TaskResourceRequest
+   */
+  def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asJava
+
+  /**
+   * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+   */
+  def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asJava
+
+  def reset(): Unit = {
+    _taskResources.clear()
+    _executorResources.clear()
+  }
+
+  def require(requests: ExecutorResourceRequests): this.type = {
+    _executorResources ++= requests.requests
+    this
+  }
+
+  def require(requests: TaskResourceRequests): this.type = {
+    _taskResources ++= requests.requests
+    this
+  }
+
+  override def toString(): String = {
+    s"Profile: id = ${_id}, executor resources: ${_executorResources}, " +
+      s"task resources: ${_taskResources}"
+  }
+}
+
+private[spark] object ResourceProfile extends Logging {
+  val UNKNOWN_RESOURCE_PROFILE_ID = -1
+  val DEFAULT_RESOURCE_PROFILE_ID = 0
+
+  val CPUS = "cpus"
+  val CORES = "cores"
+  val MEMORY = "memory"
+  val OVERHEAD_MEM = "memoryOverhead"
+  val PYSPARK_MEM = "pyspark.memory"
+
+  private lazy val nextProfileId = new AtomicInteger(0)
+
+  // The default resource profile uses the application level configs.
+  // Create the default profile immediately to get ID 0, its initialized later when fetched.
+  private val defaultProfileRef: AtomicReference[ResourceProfile] =
+    new AtomicReference[ResourceProfile](new ResourceProfile())
+
+  assert(defaultProfileRef.get().id == DEFAULT_RESOURCE_PROFILE_ID,
+    s"Default Profile must have the default profile id: $DEFAULT_RESOURCE_PROFILE_ID")
+
+  def getNextProfileId: Int = nextProfileId.getAndIncrement()
+
+  def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = {
+    val defaultProf = defaultProfileRef.get()
+    // check to see if the default profile was initialized yet
+    if (defaultProf.executorResources == Map.empty) {
+      synchronized {
+        val prof = defaultProfileRef.get()
+        if (prof.executorResources == Map.empty) {
+          addDefaultTaskResources(prof, conf)
+          addDefaultExecutorResources(prof, conf)
+        }
+        prof
+      }
+    } else {
+      defaultProf
+    }
+  }
+
+  private def addDefaultTaskResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
+    val cpusPerTask = conf.get(CPUS_PER_TASK)
+    val treqs = new TaskResourceRequests().cpus(cpusPerTask)
+    val taskReq = ResourceUtils.parseResourceRequirements(conf, SPARK_TASK_PREFIX)
+    taskReq.foreach { req =>
+      val name = s"${RESOURCE_PREFIX}.${req.resourceName}"
+      treqs.resource(name, req.amount)
+    }
+    rprof.require(treqs)
+  }
+
+  private def addDefaultExecutorResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
+    val ereqs = new ExecutorResourceRequests()
+    ereqs.cores(conf.get(EXECUTOR_CORES))
+    ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
+    val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
+    execReq.foreach { req =>
+      val name = s"${RESOURCE_PREFIX}.${req.id.resourceName}"
+      ereqs.resource(name, req.amount, req.discoveryScript.getOrElse(""),
+        req.vendor.getOrElse(""))
+    }
+    rprof.require(ereqs)
+  }
+
+  // for testing purposes
+  def resetDefaultProfile(conf: SparkConf): Unit = getOrCreateDefaultProfile(conf).reset()
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index e5ae7a9..ce4fd05 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.Utils.executeAndGetOutput
  * @param resourceName  gpu, fpga, etc
  */
 private[spark] case class ResourceID(componentName: String, resourceName: String) {
-  def confPrefix: String = s"$componentName.resource.$resourceName." // with ending dot
+  def confPrefix: String = s"$componentName.${ResourceUtils.RESOURCE_PREFIX}.$resourceName."
   def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}"
   def discoveryScriptConf: String = s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}"
   def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}"
@@ -111,7 +111,7 @@ private[spark] object ResourceUtils extends Logging {
   }
 
   def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = {
-    sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, _) =>
+    sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_DOT").map { case (key, _) =>
       key.substring(0, key.indexOf('.'))
     }.toSet.toSeq.map(name => ResourceID(componentName, name))
   }
@@ -258,4 +258,7 @@ private[spark] object ResourceUtils extends Logging {
   // known types of resources
   final val GPU: String = "gpu"
   final val FPGA: String = "fpga"
+
+  final val RESOURCE_PREFIX: String = "resource"
+  final val RESOURCE_DOT: String = s"$RESOURCE_PREFIX."
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
new file mode 100644
index 0000000..22eda52
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
+
+/**
+ * A task resource request. This is used in conjuntion with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * Use TaskResourceRequests class as a convenience API.
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class TaskResourceRequest(val resourceName: String, val amount: Double)
+  extends Serializable {
+
+  assert(amount <= 0.5 || amount % 1 == 0,
+    s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
+
+  if (!resourceName.equals(ResourceProfile.CPUS) && !resourceName.startsWith(RESOURCE_DOT)) {
+    throw new IllegalArgumentException(s"Task resource not allowed: $resourceName")
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
new file mode 100644
index 0000000..21cbc5d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.resource.ResourceProfile._
+import org.apache.spark.resource.ResourceUtils._
+
+/**
+ * A set of task resource requests. This is used in conjuntion with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class TaskResourceRequests() extends Serializable {
+
+  private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
+
+  def requests: Map[String, TaskResourceRequest] = _taskResources.toMap
+
+  /**
+   * Specify number of cpus per Task.
+   *
+   * @param amount Number of cpus to allocate per Task.
+   */
+  def cpus(amount: Int): this.type = {
+    val t = new TaskResourceRequest(CPUS, amount)
+    _taskResources(CPUS) = t
+    this
+  }
+
+  /**
+   *  Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
+   *  correspond to the regular Spark configs with the prefix removed. For instance, resources
+   *  like GPUs are resource.gpu (spark configs spark.task.resource.gpu.*)
+   *
+   * @param resourceName Name of the resource.
+   * @param amount Amount requesting as a Double to support fractional resource requests.
+   *               Valid values are less than or equal to 0.5 or whole numbers. This essentially
+   *               lets you configure X number of tasks to run on a single resource,
+   *               ie amount equals 0.5 translates into 2 tasks per resource address.
+   */
+  def resource(rName: String, amount: Double): this.type = {
+    val t = new TaskResourceRequest(rName, amount)
+    _taskResources(rName) = t
+    this
+  }
+
+  def addRequest(treq: TaskResourceRequest): this.type = {
+    _taskResources(treq.resourceName) = treq
+    this
+  }
+
+  override def toString: String = {
+    s"Task resource requests: ${_taskResources}"
+  }
+}
diff --git a/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java
new file mode 100644
index 0000000..0771207
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource;
+
+import java.util.Map;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+// Test the ResourceProfile and Request api's from Java
+public class JavaResourceProfileSuite {
+
+  String GpuResource = "resource.gpu";
+  String FPGAResource = "resource.fpga";
+
+  @Test
+  public void testResourceProfileAccessFromJava() throws Exception {
+    ExecutorResourceRequests execReqGpu =
+      new ExecutorResourceRequests().resource(GpuResource, 2,"myscript", "");
+    ExecutorResourceRequests execReqFpga =
+      new ExecutorResourceRequests().resource(FPGAResource, 3, "myfpgascript", "nvidia");
+
+    ResourceProfile rprof = new ResourceProfile();
+    rprof.require(execReqGpu);
+    rprof.require(execReqFpga);
+    TaskResourceRequests taskReq1 = new TaskResourceRequests().resource(GpuResource, 1);
+    rprof.require(taskReq1);
+
+    assertEquals(rprof.executorResources().size(), 2);
+    Map<String, ExecutorResourceRequest> eresources = rprof.executorResourcesJMap();
+    assert(eresources.containsKey(GpuResource));
+    ExecutorResourceRequest gpuReq = eresources.get(GpuResource);
+    assertEquals(gpuReq.amount(), 2);
+    assertEquals(gpuReq.discoveryScript(), "myscript");
+    assertEquals(gpuReq.vendor(), "");
+
+    assert(eresources.containsKey(FPGAResource));
+    ExecutorResourceRequest fpgaReq = eresources.get(FPGAResource);
+    assertEquals(fpgaReq.amount(), 3);
+    assertEquals(fpgaReq.discoveryScript(), "myfpgascript");
+    assertEquals(fpgaReq.vendor(), "nvidia");
+
+    assertEquals(rprof.taskResources().size(), 1);
+    Map<String, TaskResourceRequest> tresources = rprof.taskResourcesJMap();
+    assert(tresources.containsKey(GpuResource));
+    TaskResourceRequest taskReq = tresources.get(GpuResource);
+    assertEquals(taskReq.amount(), 1.0, 0);
+    assertEquals(taskReq.resourceName(), GpuResource);
+  }
+}
+
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
new file mode 100644
index 0000000..a087f18
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class ResourceProfileSuite extends SparkFunSuite {
+
+  override def afterEach() {
+    try {
+      ResourceProfile.resetDefaultProfile(new SparkConf)
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  test("Default ResourceProfile") {
+    val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
+    assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    assert(rprof.executorResources.size === 2,
+      "Executor resources should contain cores and memory by default")
+    assert(rprof.executorResources(ResourceProfile.CORES).amount === 1,
+      s"Executor resources should have 1 core")
+    assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 1024,
+      s"Executor resources should have 1024 memory")
+    assert(rprof.taskResources.size === 1,
+      "Task resources should just contain cpus by default")
+    assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
+      s"Task resources should have 1 cpu")
+  }
+
+  test("Default ResourceProfile with app level resources specified") {
+    val conf = new SparkConf
+    conf.set("spark.task.resource.gpu.amount", "1")
+    conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.amount", "1")
+    conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.discoveryScript", "nameOfScript")
+    val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
+    assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val execResources = rprof.executorResources
+    assert(execResources.size === 3,
+      "Executor resources should contain cores, memory, and gpu " + execResources)
+    assert(rprof.taskResources.size === 2,
+      "Task resources should just contain cpus and gpu")
+    assert(execResources.contains("resource.gpu"), "Executor resources should have gpu")
+    assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
+  }
+
+  test("Create ResourceProfile") {
+    val rprof = new ResourceProfile()
+    assert(rprof.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    assert(rprof.executorResources === Map.empty)
+    assert(rprof.taskResources === Map.empty)
+
+    val taskReq = new TaskResourceRequests().resource("resource.gpu", 1)
+    val eReq = new ExecutorResourceRequests().resource("resource.gpu", 2, "myscript", "nvidia")
+    rprof.require(taskReq).require(eReq)
+
+    assert(rprof.executorResources.size === 1)
+    assert(rprof.executorResources.contains("resource.gpu"),
+      "Executor resources should have gpu")
+    assert(rprof.executorResources.get("resource.gpu").get.vendor === "nvidia",
+      "gpu vendor should be nvidia")
+    assert(rprof.executorResources.get("resource.gpu").get.discoveryScript === "myscript",
+      "discoveryScript should be myscript")
+    assert(rprof.executorResources.get("resource.gpu").get.amount === 2,
+    "gpu amount should be 2")
+
+    assert(rprof.taskResources.size === 1, "Should have 1 task resource")
+    assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
+    assert(rprof.taskResources.get("resource.gpu").get.amount === 1,
+      "Task resources should have 1 gpu")
+
+    val ereqs = new ExecutorResourceRequests()
+    ereqs.cores(2).memory("4096")
+    ereqs.memoryOverhead("2048").pysparkMemory("1024")
+    val treqs = new TaskResourceRequests()
+    treqs.cpus(1)
+
+    rprof.require(treqs)
+    rprof.require(ereqs)
+
+    assert(rprof.executorResources.size === 5)
+    assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
+      s"Executor resources should have 2 cores")
+    assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
+      s"Executor resources should have 4096 memory")
+    assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2048,
+      s"Executor resources should have 2048 overhead memory")
+    assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
+      s"Executor resources should have 1024 pyspark memory")
+
+    assert(rprof.taskResources.size === 2)
+    assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
+
+    val error = intercept[IllegalArgumentException] {
+      rprof.require(new ExecutorResourceRequests().resource("bogusResource", 1))
+    }.getMessage()
+    assert(error.contains("Executor resource not allowed"))
+
+    val taskError = intercept[IllegalArgumentException] {
+      rprof.require(new TaskResourceRequests().resource("bogusTaskResource", 1))
+    }.getMessage()
+    assert(taskError.contains("Task resource not allowed"))
+  }
+
+  test("Test ExecutorResourceRequests memory helpers") {
+    val rprof = new ResourceProfile()
+    val ereqs = new ExecutorResourceRequests()
+    ereqs.memory("4g")
+    ereqs.memoryOverhead("2000m").pysparkMemory("512000k")
+    rprof.require(ereqs)
+
+    assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
+      s"Executor resources should have 4096 memory")
+    assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2000,
+      s"Executor resources should have 2000 overhead memory")
+    assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
+      s"Executor resources should have 512 pyspark memory")
+  }
+
+  test("Test TaskResourceRequest fractional") {
+    val rprof = new ResourceProfile()
+    val treqs = new TaskResourceRequests().resource("resource.gpu", 0.33)
+    rprof.require(treqs)
+
+    assert(rprof.taskResources.size === 1, "Should have 1 task resource")
+    assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
+    assert(rprof.taskResources.get("resource.gpu").get.amount === 0.33,
+      "Task resources should have 0.33 gpu")
+
+    val fpgaReqs = new TaskResourceRequests().resource("resource.fpga", 4.0)
+    rprof.require(fpgaReqs)
+
+    assert(rprof.taskResources.size === 2, "Should have 2 task resource")
+    assert(rprof.taskResources.contains("resource.fpga"), "Task resources should have gpu")
+    assert(rprof.taskResources.get("resource.fpga").get.amount === 4.0,
+      "Task resources should have 4.0 gpu")
+
+    var taskError = intercept[AssertionError] {
+      rprof.require(new TaskResourceRequests().resource("resource.gpu", 1.5))
+    }.getMessage()
+    assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number."))
+
+    taskError = intercept[AssertionError] {
+      rprof.require(new TaskResourceRequests().resource("resource.gpu", 0.7))
+    }.getMessage()
+    assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number."))
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org