You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2019/11/25 15:37:05 UTC
[spark] branch master updated: [SPARK-29415][CORE] Stage Level
Sched: Add base ResourceProfile and Request classes
This is an automated email from the ASF dual-hosted git repository.
tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2d5de25 [SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes
2d5de25 is described below
commit 2d5de25a999e0e5580cf4024937b61e6c9265672
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Mon Nov 25 09:36:39 2019 -0600
[SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes
### What changes were proposed in this pull request?
This PR is adding the base classes needed for Stage level scheduling. Its adding a ResourceProfile and the executor and task resource request classes. These are made private for now until we get all the parts implemented, at which point this will become public interfaces. I am adding them first as all the other subtasks for this feature require these classes. If people have better ideas on breaking this feature up please let me know.
See https://issues.apache.org/jira/browse/SPARK-29415 for more detailed design.
### Why are the changes needed?
New API for stage level scheduling. Its easier to add these first because the other jira for this features will all use them.
### Does this PR introduce any user-facing change?
Yes adds API to create a ResourceProfile with executor/task resources, see the spip jira https://issues.apache.org/jira/browse/SPARK-27495
Example of the api:
val rp = new ResourceProfile()
rp.require(new ExecutorResourceRequest("cores", 2))
rp.require(new ExecutorResourceRequest("gpu", 1, Some("/opt/gpuScripts/getGpus")))
rp.require(new TaskResourceRequest("gpu", 1))
### How was this patch tested?
Tested using Unit tests added with this PR.
Closes #26284 from tgravescs/SPARK-29415.
Authored-by: Thomas Graves <tg...@nvidia.com>
Signed-off-by: Thomas Graves <tg...@apache.org>
---
.../spark/resource/ExecutorResourceRequest.scala | 77 ++++++++++
.../spark/resource/ExecutorResourceRequests.scala | 122 +++++++++++++++
.../apache/spark/resource/ResourceProfile.scala | 147 ++++++++++++++++++
.../org/apache/spark/resource/ResourceUtils.scala | 7 +-
.../spark/resource/TaskResourceRequest.scala | 43 ++++++
.../spark/resource/TaskResourceRequests.scala | 75 ++++++++++
.../spark/resource/JavaResourceProfileSuite.java | 66 ++++++++
.../spark/resource/ResourceProfileSuite.scala | 166 +++++++++++++++++++++
8 files changed, 701 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
new file mode 100644
index 0000000..88ceaad
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
+
+/**
+ * An Executor resource request. This is used in conjunction with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * This is used to specify what the resource requirements are for an Executor and how
+ * Spark can find out specific details about those resources. Not all the parameters are
+ * required for every resource type. The resources names supported
+ * correspond to the regular Spark configs with the prefix removed. For instance overhead
+ * memory in this api is memoryOverhead, which is spark.executor.memoryOverhead with
+ * spark.executor removed. Resources like GPUs are resource.gpu
+ * (spark configs spark.executor.resource.gpu.*). The amount, discoveryScript, and vendor
+ * parameters for resources are all the same parameters a user would specify through the
+ * configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}.
+ *
+ * For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has
+ * to specify the resource name (resource.gpu), the amount or number of GPUs per Executor,
+ * the discovery script would be specified so that when the Executor starts up it can
+ * discovery what GPU addresses are available for it to use because YARN doesn't tell
+ * Spark that, then vendor would not be used because its specific for Kubernetes.
+ *
+ * See the configuration and cluster specific docs for more details.
+ *
+ * Use ExecutorResourceRequests class as a convenience API.
+ *
+ * @param resourceName Name of the resource
+ * @param amount Amount requesting
+ * @param discoveryScript Optional script used to discover the resources. This is required on some
+ * cluster managers that don't tell Spark the addresses of the resources
+ * allocated. The script runs on Executors startup to discover the addresses
+ * of the resources available.
+ * @param vendor Optional vendor, required for some cluster managers
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class ExecutorResourceRequest(
+ val resourceName: String,
+ val amount: Long,
+ val discoveryScript: String = "",
+ val vendor: String = "") extends Serializable {
+
+ // A list of allowed Spark internal resources. Custom resources (spark.executor.resource.*)
+ // like GPUs/FPGAs are also allowed, see the check below.
+ private val allowedExecutorResources = mutable.HashSet[String](
+ ResourceProfile.MEMORY,
+ ResourceProfile.OVERHEAD_MEM,
+ ResourceProfile.PYSPARK_MEM,
+ ResourceProfile.CORES)
+
+ if (!allowedExecutorResources.contains(resourceName) && !resourceName.startsWith(RESOURCE_DOT)) {
+ throw new IllegalArgumentException(s"Executor resource not allowed: $resourceName")
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
new file mode 100644
index 0000000..6ffcc0c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.resource.ResourceProfile._
+
+/**
+ * A set of Executor resource requests. This is used in conjunction with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class ExecutorResourceRequests() extends Serializable {
+
+ private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
+
+ def requests: Map[String, ExecutorResourceRequest] = _executorResources.toMap
+
+ /**
+ * Specify heap memory. The value specified will be converted to MiB.
+ *
+ * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ * Default unit is MiB if not specified.
+ */
+ def memory(amount: String): this.type = {
+ val amountMiB = JavaUtils.byteStringAsMb(amount)
+ val rr = new ExecutorResourceRequest(MEMORY, amountMiB)
+ _executorResources(MEMORY) = rr
+ this
+ }
+
+ /**
+ * Specify overhead memory. The value specified will be converted to MiB.
+ *
+ * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ * Default unit is MiB if not specified.
+ */
+ def memoryOverhead(amount: String): this.type = {
+ val amountMiB = JavaUtils.byteStringAsMb(amount)
+ val rr = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB)
+ _executorResources(OVERHEAD_MEM) = rr
+ this
+ }
+
+ /**
+ * Specify pyspark memory. The value specified will be converted to MiB.
+ *
+ * @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
+ * Default unit is MiB if not specified.
+ */
+ def pysparkMemory(amount: String): this.type = {
+ val amountMiB = JavaUtils.byteStringAsMb(amount)
+ val rr = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB)
+ _executorResources(PYSPARK_MEM) = rr
+ this
+ }
+
+ /**
+ * Specify number of cores per Executor.
+ *
+ * @param amount Number of cores to allocate per Executor.
+ */
+ def cores(amount: Int): this.type = {
+ val t = new ExecutorResourceRequest(CORES, amount)
+ _executorResources(CORES) = t
+ this
+ }
+
+ /**
+ * Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
+ * correspond to the regular Spark configs with the prefix removed. For instance, resources
+ * like GPUs are resource.gpu (spark configs spark.executor.resource.gpu.*)
+ *
+ * @param resourceName Name of the resource.
+ * @param amount amount of that resource per executor to use.
+ * @param discoveryScript Optional script used to discover the resources. This is required on
+ * some cluster managers that don't tell Spark the addresses of
+ * the resources allocated. The script runs on Executors startup to
+ * of the resources available.
+ * @param vendor Optional vendor, required for some cluster managers
+ */
+ def resource(
+ resourceName: String,
+ amount: Long,
+ discoveryScript: String = "",
+ vendor: String = ""): this.type = {
+ // a bit weird but for Java api use empty string as meaning None because empty
+ // string is otherwise invalid for those paramters anyway
+ val eReq = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
+ _executorResources(resourceName) = eReq
+ this
+ }
+
+ def addRequest(ereq: ExecutorResourceRequest): this.type = {
+ _executorResources(ereq.resourceName) = ereq
+ this
+ }
+
+ override def toString: String = {
+ s"Executor resource requests: ${_executorResources}"
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
new file mode 100644
index 0000000..876a655
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.util.{Map => JMap}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX
+
+/**
+ * Resource profile to associate with an RDD. A ResourceProfile allows the user to
+ * specify executor and task requirements for an RDD that will get applied during a
+ * stage. This allows the user to change the resource requirements between stages.
+ *
+ * This class is private now for initial development, once we have the feature in place
+ * this will become public.
+ */
+@Evolving
+private[spark] class ResourceProfile() extends Serializable {
+
+ private val _id = ResourceProfile.getNextProfileId
+ private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
+ private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
+
+ def id: Int = _id
+ def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap
+ def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.toMap
+
+ /**
+ * (Java-specific) gets a Java Map of resources to TaskResourceRequest
+ */
+ def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asJava
+
+ /**
+ * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
+ */
+ def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asJava
+
+ def reset(): Unit = {
+ _taskResources.clear()
+ _executorResources.clear()
+ }
+
+ def require(requests: ExecutorResourceRequests): this.type = {
+ _executorResources ++= requests.requests
+ this
+ }
+
+ def require(requests: TaskResourceRequests): this.type = {
+ _taskResources ++= requests.requests
+ this
+ }
+
+ override def toString(): String = {
+ s"Profile: id = ${_id}, executor resources: ${_executorResources}, " +
+ s"task resources: ${_taskResources}"
+ }
+}
+
+private[spark] object ResourceProfile extends Logging {
+ val UNKNOWN_RESOURCE_PROFILE_ID = -1
+ val DEFAULT_RESOURCE_PROFILE_ID = 0
+
+ val CPUS = "cpus"
+ val CORES = "cores"
+ val MEMORY = "memory"
+ val OVERHEAD_MEM = "memoryOverhead"
+ val PYSPARK_MEM = "pyspark.memory"
+
+ private lazy val nextProfileId = new AtomicInteger(0)
+
+ // The default resource profile uses the application level configs.
+ // Create the default profile immediately to get ID 0, its initialized later when fetched.
+ private val defaultProfileRef: AtomicReference[ResourceProfile] =
+ new AtomicReference[ResourceProfile](new ResourceProfile())
+
+ assert(defaultProfileRef.get().id == DEFAULT_RESOURCE_PROFILE_ID,
+ s"Default Profile must have the default profile id: $DEFAULT_RESOURCE_PROFILE_ID")
+
+ def getNextProfileId: Int = nextProfileId.getAndIncrement()
+
+ def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = {
+ val defaultProf = defaultProfileRef.get()
+ // check to see if the default profile was initialized yet
+ if (defaultProf.executorResources == Map.empty) {
+ synchronized {
+ val prof = defaultProfileRef.get()
+ if (prof.executorResources == Map.empty) {
+ addDefaultTaskResources(prof, conf)
+ addDefaultExecutorResources(prof, conf)
+ }
+ prof
+ }
+ } else {
+ defaultProf
+ }
+ }
+
+ private def addDefaultTaskResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
+ val cpusPerTask = conf.get(CPUS_PER_TASK)
+ val treqs = new TaskResourceRequests().cpus(cpusPerTask)
+ val taskReq = ResourceUtils.parseResourceRequirements(conf, SPARK_TASK_PREFIX)
+ taskReq.foreach { req =>
+ val name = s"${RESOURCE_PREFIX}.${req.resourceName}"
+ treqs.resource(name, req.amount)
+ }
+ rprof.require(treqs)
+ }
+
+ private def addDefaultExecutorResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
+ val ereqs = new ExecutorResourceRequests()
+ ereqs.cores(conf.get(EXECUTOR_CORES))
+ ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
+ val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
+ execReq.foreach { req =>
+ val name = s"${RESOURCE_PREFIX}.${req.id.resourceName}"
+ ereqs.resource(name, req.amount, req.discoveryScript.getOrElse(""),
+ req.vendor.getOrElse(""))
+ }
+ rprof.require(ereqs)
+ }
+
+ // for testing purposes
+ def resetDefaultProfile(conf: SparkConf): Unit = getOrCreateDefaultProfile(conf).reset()
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index e5ae7a9..ce4fd05 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.Utils.executeAndGetOutput
* @param resourceName gpu, fpga, etc
*/
private[spark] case class ResourceID(componentName: String, resourceName: String) {
- def confPrefix: String = s"$componentName.resource.$resourceName." // with ending dot
+ def confPrefix: String = s"$componentName.${ResourceUtils.RESOURCE_PREFIX}.$resourceName."
def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}"
def discoveryScriptConf: String = s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}"
def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}"
@@ -111,7 +111,7 @@ private[spark] object ResourceUtils extends Logging {
}
def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = {
- sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, _) =>
+ sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_DOT").map { case (key, _) =>
key.substring(0, key.indexOf('.'))
}.toSet.toSeq.map(name => ResourceID(componentName, name))
}
@@ -258,4 +258,7 @@ private[spark] object ResourceUtils extends Logging {
// known types of resources
final val GPU: String = "gpu"
final val FPGA: String = "fpga"
+
+ final val RESOURCE_PREFIX: String = "resource"
+ final val RESOURCE_DOT: String = s"$RESOURCE_PREFIX."
}
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
new file mode 100644
index 0000000..22eda52
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
+
+/**
+ * A task resource request. This is used in conjuntion with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * Use TaskResourceRequests class as a convenience API.
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class TaskResourceRequest(val resourceName: String, val amount: Double)
+ extends Serializable {
+
+ assert(amount <= 0.5 || amount % 1 == 0,
+ s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
+
+ if (!resourceName.equals(ResourceProfile.CPUS) && !resourceName.startsWith(RESOURCE_DOT)) {
+ throw new IllegalArgumentException(s"Task resource not allowed: $resourceName")
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
new file mode 100644
index 0000000..21cbc5d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.collection.mutable
+
+import org.apache.spark.resource.ResourceProfile._
+import org.apache.spark.resource.ResourceUtils._
+
+/**
+ * A set of task resource requests. This is used in conjuntion with the ResourceProfile to
+ * programmatically specify the resources needed for an RDD that will be applied at the
+ * stage level.
+ *
+ * This api is currently private until the rest of the pieces are in place and then it
+ * will become public.
+ */
+private[spark] class TaskResourceRequests() extends Serializable {
+
+ private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
+
+ def requests: Map[String, TaskResourceRequest] = _taskResources.toMap
+
+ /**
+ * Specify number of cpus per Task.
+ *
+ * @param amount Number of cpus to allocate per Task.
+ */
+ def cpus(amount: Int): this.type = {
+ val t = new TaskResourceRequest(CPUS, amount)
+ _taskResources(CPUS) = t
+ this
+ }
+
+ /**
+ * Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
+ * correspond to the regular Spark configs with the prefix removed. For instance, resources
+ * like GPUs are resource.gpu (spark configs spark.task.resource.gpu.*)
+ *
+ * @param resourceName Name of the resource.
+ * @param amount Amount requesting as a Double to support fractional resource requests.
+ * Valid values are less than or equal to 0.5 or whole numbers. This essentially
+ * lets you configure X number of tasks to run on a single resource,
+ * ie amount equals 0.5 translates into 2 tasks per resource address.
+ */
+ def resource(rName: String, amount: Double): this.type = {
+ val t = new TaskResourceRequest(rName, amount)
+ _taskResources(rName) = t
+ this
+ }
+
+ def addRequest(treq: TaskResourceRequest): this.type = {
+ _taskResources(treq.resourceName) = treq
+ this
+ }
+
+ override def toString: String = {
+ s"Task resource requests: ${_taskResources}"
+ }
+}
diff --git a/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java
new file mode 100644
index 0000000..0771207
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource;
+
+import java.util.Map;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+// Test the ResourceProfile and Request api's from Java
+public class JavaResourceProfileSuite {
+
+ String GpuResource = "resource.gpu";
+ String FPGAResource = "resource.fpga";
+
+ @Test
+ public void testResourceProfileAccessFromJava() throws Exception {
+ ExecutorResourceRequests execReqGpu =
+ new ExecutorResourceRequests().resource(GpuResource, 2,"myscript", "");
+ ExecutorResourceRequests execReqFpga =
+ new ExecutorResourceRequests().resource(FPGAResource, 3, "myfpgascript", "nvidia");
+
+ ResourceProfile rprof = new ResourceProfile();
+ rprof.require(execReqGpu);
+ rprof.require(execReqFpga);
+ TaskResourceRequests taskReq1 = new TaskResourceRequests().resource(GpuResource, 1);
+ rprof.require(taskReq1);
+
+ assertEquals(rprof.executorResources().size(), 2);
+ Map<String, ExecutorResourceRequest> eresources = rprof.executorResourcesJMap();
+ assert(eresources.containsKey(GpuResource));
+ ExecutorResourceRequest gpuReq = eresources.get(GpuResource);
+ assertEquals(gpuReq.amount(), 2);
+ assertEquals(gpuReq.discoveryScript(), "myscript");
+ assertEquals(gpuReq.vendor(), "");
+
+ assert(eresources.containsKey(FPGAResource));
+ ExecutorResourceRequest fpgaReq = eresources.get(FPGAResource);
+ assertEquals(fpgaReq.amount(), 3);
+ assertEquals(fpgaReq.discoveryScript(), "myfpgascript");
+ assertEquals(fpgaReq.vendor(), "nvidia");
+
+ assertEquals(rprof.taskResources().size(), 1);
+ Map<String, TaskResourceRequest> tresources = rprof.taskResourcesJMap();
+ assert(tresources.containsKey(GpuResource));
+ TaskResourceRequest taskReq = tresources.get(GpuResource);
+ assertEquals(taskReq.amount(), 1.0, 0);
+ assertEquals(taskReq.resourceName(), GpuResource);
+ }
+}
+
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
new file mode 100644
index 0000000..a087f18
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class ResourceProfileSuite extends SparkFunSuite {
+
+ override def afterEach() {
+ try {
+ ResourceProfile.resetDefaultProfile(new SparkConf)
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ test("Default ResourceProfile") {
+ val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
+ assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ assert(rprof.executorResources.size === 2,
+ "Executor resources should contain cores and memory by default")
+ assert(rprof.executorResources(ResourceProfile.CORES).amount === 1,
+ s"Executor resources should have 1 core")
+ assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 1024,
+ s"Executor resources should have 1024 memory")
+ assert(rprof.taskResources.size === 1,
+ "Task resources should just contain cpus by default")
+ assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
+ s"Task resources should have 1 cpu")
+ }
+
+ test("Default ResourceProfile with app level resources specified") {
+ val conf = new SparkConf
+ conf.set("spark.task.resource.gpu.amount", "1")
+ conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.amount", "1")
+ conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.discoveryScript", "nameOfScript")
+ val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
+ assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ val execResources = rprof.executorResources
+ assert(execResources.size === 3,
+ "Executor resources should contain cores, memory, and gpu " + execResources)
+ assert(rprof.taskResources.size === 2,
+ "Task resources should just contain cpus and gpu")
+ assert(execResources.contains("resource.gpu"), "Executor resources should have gpu")
+ assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
+ }
+
+ test("Create ResourceProfile") {
+ val rprof = new ResourceProfile()
+ assert(rprof.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ assert(rprof.executorResources === Map.empty)
+ assert(rprof.taskResources === Map.empty)
+
+ val taskReq = new TaskResourceRequests().resource("resource.gpu", 1)
+ val eReq = new ExecutorResourceRequests().resource("resource.gpu", 2, "myscript", "nvidia")
+ rprof.require(taskReq).require(eReq)
+
+ assert(rprof.executorResources.size === 1)
+ assert(rprof.executorResources.contains("resource.gpu"),
+ "Executor resources should have gpu")
+ assert(rprof.executorResources.get("resource.gpu").get.vendor === "nvidia",
+ "gpu vendor should be nvidia")
+ assert(rprof.executorResources.get("resource.gpu").get.discoveryScript === "myscript",
+ "discoveryScript should be myscript")
+ assert(rprof.executorResources.get("resource.gpu").get.amount === 2,
+ "gpu amount should be 2")
+
+ assert(rprof.taskResources.size === 1, "Should have 1 task resource")
+ assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
+ assert(rprof.taskResources.get("resource.gpu").get.amount === 1,
+ "Task resources should have 1 gpu")
+
+ val ereqs = new ExecutorResourceRequests()
+ ereqs.cores(2).memory("4096")
+ ereqs.memoryOverhead("2048").pysparkMemory("1024")
+ val treqs = new TaskResourceRequests()
+ treqs.cpus(1)
+
+ rprof.require(treqs)
+ rprof.require(ereqs)
+
+ assert(rprof.executorResources.size === 5)
+ assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
+ s"Executor resources should have 2 cores")
+ assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
+ s"Executor resources should have 4096 memory")
+ assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2048,
+ s"Executor resources should have 2048 overhead memory")
+ assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
+ s"Executor resources should have 1024 pyspark memory")
+
+ assert(rprof.taskResources.size === 2)
+ assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
+
+ val error = intercept[IllegalArgumentException] {
+ rprof.require(new ExecutorResourceRequests().resource("bogusResource", 1))
+ }.getMessage()
+ assert(error.contains("Executor resource not allowed"))
+
+ val taskError = intercept[IllegalArgumentException] {
+ rprof.require(new TaskResourceRequests().resource("bogusTaskResource", 1))
+ }.getMessage()
+ assert(taskError.contains("Task resource not allowed"))
+ }
+
+ test("Test ExecutorResourceRequests memory helpers") {
+ val rprof = new ResourceProfile()
+ val ereqs = new ExecutorResourceRequests()
+ ereqs.memory("4g")
+ ereqs.memoryOverhead("2000m").pysparkMemory("512000k")
+ rprof.require(ereqs)
+
+ assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
+ s"Executor resources should have 4096 memory")
+ assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2000,
+ s"Executor resources should have 2000 overhead memory")
+ assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
+ s"Executor resources should have 512 pyspark memory")
+ }
+
+ test("Test TaskResourceRequest fractional") {
+ val rprof = new ResourceProfile()
+ val treqs = new TaskResourceRequests().resource("resource.gpu", 0.33)
+ rprof.require(treqs)
+
+ assert(rprof.taskResources.size === 1, "Should have 1 task resource")
+ assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
+ assert(rprof.taskResources.get("resource.gpu").get.amount === 0.33,
+ "Task resources should have 0.33 gpu")
+
+ val fpgaReqs = new TaskResourceRequests().resource("resource.fpga", 4.0)
+ rprof.require(fpgaReqs)
+
+ assert(rprof.taskResources.size === 2, "Should have 2 task resource")
+ assert(rprof.taskResources.contains("resource.fpga"), "Task resources should have gpu")
+ assert(rprof.taskResources.get("resource.fpga").get.amount === 4.0,
+ "Task resources should have 4.0 gpu")
+
+ var taskError = intercept[AssertionError] {
+ rprof.require(new TaskResourceRequests().resource("resource.gpu", 1.5))
+ }.getMessage()
+ assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number."))
+
+ taskError = intercept[AssertionError] {
+ rprof.require(new TaskResourceRequests().resource("resource.gpu", 0.7))
+ }.getMessage()
+ assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number."))
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org