You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2019/05/31 20:26:59 UTC

[spark] branch master updated: [SPARK-27362][K8S] Resource Scheduling support for k8s

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1277f8f  [SPARK-27362][K8S] Resource Scheduling support for k8s
1277f8f is described below

commit 1277f8fa92da85d9e39d9146e3099fcb75c71a8f
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Fri May 31 15:26:14 2019 -0500

    [SPARK-27362][K8S] Resource Scheduling support for k8s
    
    ## What changes were proposed in this pull request?
    
    Add ability to map the spark resource configs spark.{executor/driver}.resource.{resourceName} to kubernetes Container builder so that we request resources (gpu,s/fpgas/etc) from kubernetes.
    Note that the spark configs will overwrite any resource configs users put into a pod template.
    I added a generic vendor config which is only used by kubernetes right now.  I intentionally didn't put it into the kubernetes config namespace just to avoid adding more config prefixes.
    
    I will add more documentation for this under jira SPARK-27492. I think it will be easier to do all at once to get cohesive story.
    
    ## How was this patch tested?
    
    Unit tests and manually testing on k8s cluster.
    
    Closes #24703 from tgravescs/SPARK-27362.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/internal/config/package.scala |  1 +
 docs/configuration.md                              | 20 ++++++++
 docs/running-on-kubernetes.md                      |  1 +
 .../apache/spark/deploy/k8s/KubernetesConf.scala   |  9 ++++
 .../apache/spark/deploy/k8s/KubernetesUtils.scala  | 29 ++++++++++-
 .../k8s/features/BasicDriverFeatureStep.scala      |  4 ++
 .../k8s/features/BasicExecutorFeatureStep.scala    | 13 +++--
 .../k8s/features/BasicDriverFeatureStepSuite.scala | 14 +++++-
 .../features/BasicExecutorFeatureStepSuite.scala   | 56 +++++++++++++++++++++-
 .../k8s/features/KubernetesFeaturesTestUtils.scala |  2 +
 10 files changed, 142 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a5d36b5..8ea8887 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -37,6 +37,7 @@ package object config {
 
   private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count"
   private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript"
+  private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor"
 
   private[spark] val DRIVER_RESOURCES_FILE =
     ConfigBuilder("spark.driver.resourcesFile")
diff --git a/docs/configuration.md b/docs/configuration.md
index 2169951..24e66e1 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -207,6 +207,16 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
+ <td><code>spark.driver.resource.{resourceName}.vendor</code></td>
+  <td>None</td>
+  <td>
+    Vendor of the resources to use for the driver. This option is currently
+    only supported on Kubernetes and is actually both the vendor and domain following
+    the Kubernetes device plugin naming convention. (e.g. For GPUs on Kubernetes
+    this config would be set to nvidia.com or amd.com)
+  </td>
+</tr>
+<tr>
   <td><code>spark.executor.memory</code></td>
   <td>1g</td>
   <td>
@@ -260,6 +270,16 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
+ <td><code>spark.executor.resource.{resourceName}.vendor</code></td>
+  <td>None</td>
+  <td>
+    Vendor of the resources to use for the executors. This option is currently
+    only supported on Kubernetes and is actually both the vendor and domain following
+    the Kubernetes device plugin naming convention. (e.g. For GPUs on Kubernetes
+    this config would be set to nvidia.com or amd.com)
+  </td>
+</tr>
+<tr>
   <td><code>spark.extraListeners</code></td>
   <td>(none)</td>
   <td>
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 8a424b5..d4efb52 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1246,6 +1246,7 @@ The following affect the driver and executor containers. All other containers in
     The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
     <code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
     <code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.
+    Other resource limits are set by <code>spark.{driver,executor}.resources.{resourceName}.*</code> configs.
   </td>
 </tr>
 <tr>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 5e74111..a2a4661 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -199,4 +199,13 @@ private[spark] object KubernetesConf {
       .replaceAll("[^a-z0-9\\-]", "")
       .replaceAll("-+", "-")
   }
+
+  /**
+   * Build a resources name based on the vendor device plugin naming
+   * convention of: vendor-domain/resource. For example, an NVIDIA GPU is
+   * advertised as nvidia.com/gpu.
+   */
+  def buildKubernetesResourceName(vendorDomain: String, resourceName: String): String = {
+    s"${vendorDomain}/${resourceName}"
+  }
 }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index a571035..522c8f7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -23,7 +23,7 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity, QuantityBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 import org.apache.commons.codec.binary.Hex
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{SPARK_RESOURCE_COUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX}
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 import org.apache.spark.util.Utils.getHadoopFileSystem
@@ -217,6 +218,32 @@ private[spark] object KubernetesUtils extends Logging {
   }
 
   /**
+   * This function builds the Quantity objects for each resource in the Spark resource
+   * configs based on the component name(spark.driver.resource or spark.executor.resource).
+   * It assumes we can use the Kubernetes device plugin format: vendor-domain/resource.
+   * It returns a set with a tuple of vendor-domain/resource and Quantity for each resource.
+   */
+  def buildResourcesQuantities(
+      componentName: String,
+      sparkConf: SparkConf): Map[String, Quantity] = {
+    val allResources = sparkConf.getAllWithPrefix(componentName)
+    val vendors = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_VENDOR_SUFFIX).toMap
+    val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_COUNT_SUFFIX).toMap
+    val uniqueResources = SparkConf.getBaseOfConfigs(allResources)
+
+    uniqueResources.map { rName =>
+      val vendorDomain = vendors.get(rName).getOrElse(throw new SparkException("Resource: " +
+        s"$rName was requested, but vendor was not specified."))
+      val amount = amounts.get(rName).getOrElse(throw new SparkException(s"Resource: $rName " +
+        "was requested, but count was not specified."))
+      val quantity = new QuantityBuilder(false)
+        .withAmount(amount)
+        .build()
+      (KubernetesConf.buildKubernetesResourceName(vendorDomain, rName), quantity)
+    }.toMap
+  }
+
+  /**
    * Upload files and modify their uris
    */
   def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 92463df..d10f69f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -88,6 +88,9 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
       ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
     }
 
+    val driverResourceQuantities =
+      KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_RESOURCE_PREFIX, conf.sparkConf)
+
     val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
     val driverBlockManagerPort = conf.sparkConf.getInt(
       DRIVER_BLOCK_MANAGER_PORT.key,
@@ -129,6 +132,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
         .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
         .addToRequests("memory", driverMemoryQuantity)
         .addToLimits("memory", driverMemoryQuantity)
+        .addToLimits(driverResourceQuantities.asJava)
         .endResources()
       .build()
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index af16283..d46a9b8 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -95,6 +95,10 @@ private[spark] class BasicExecutorFeatureStep(
       .withAmount(executorCoresRequest)
       .build()
 
+    val executorResourceQuantities =
+      KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_RESOURCE_PREFIX,
+        kubernetesConf.sparkConf)
+
     val executorEnv: Seq[EnvVar] = {
         (Seq(
           (ENV_DRIVER_URL, driverUrl),
@@ -168,11 +172,12 @@ private[spark] class BasicExecutorFeatureStep(
         .addToRequests("memory", executorMemoryQuantity)
         .addToLimits("memory", executorMemoryQuantity)
         .addToRequests("cpu", executorCpuQuantity)
+        .addToLimits(executorResourceQuantities.asJava)
         .endResources()
-        .addNewEnv()
-          .withName(ENV_SPARK_USER)
-          .withValue(Utils.getCurrentUserName())
-          .endEnv()
+      .addNewEnv()
+        .withName(ENV_SPARK_USER)
+        .withValue(Utils.getCurrentUserName())
+        .endEnv()
       .addAllToEnv(executorEnv.asJava)
       .withPorts(requiredPorts.asJava)
       .addToArgs("executor")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 92f46c6..f60c6fb 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -24,10 +24,10 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
-import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
@@ -45,6 +45,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     }
 
   test("Check the pod respects all configurations from the user.") {
+    val resources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")))
     val sparkConf = new SparkConf()
       .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
       .set(DRIVER_CORES, 2)
@@ -53,6 +54,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(DRIVER_MEMORY_OVERHEAD, 200L)
       .set(CONTAINER_IMAGE, "spark-driver:latest")
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
+    resources.foreach { case (_, testRInfo) =>
+      sparkConf.set(
+        s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
+        testRInfo.count)
+      sparkConf.set(
+        s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
+        testRInfo.vendor)
+    }
     val kubernetesConf = KubernetesTestConf.createDriverConf(
       sparkConf = sparkConf,
       labels = DRIVER_LABELS,
@@ -100,6 +109,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val limits = resourceRequirements.getLimits.asScala
     assert(limits("memory").getAmount === "456Mi")
     assert(limits("cpu").getAmount === "4")
+    resources.foreach { case (k8sName, testRInfo) =>
+      assert(limits(k8sName).getAmount === testRInfo.count)
+    }
 
     val driverPodMetadata = configuredPod.pod.getMetadata
     assert(driverPodMetadata.getName === "spark-driver-pod")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 93268c6..3e892a9 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -26,11 +26,13 @@ import com.google.common.net.InternetDomainName
 import io.fabric8.kubernetes.api.model._
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -90,6 +92,58 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
       environment = environment)
   }
 
+  test("test spark resource missing vendor") {
+    val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")))
+    // test missing vendor
+    gpuResources.foreach { case (_, testRInfo) =>
+      baseConf.set(
+        s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
+        testRInfo.count)
+    }
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    val error = intercept[SparkException] {
+      val executor = step.configurePod(SparkPod.initialPod())
+    }.getMessage()
+    assert(error.contains("Resource: gpu was requested, but vendor was not specified"))
+  }
+
+  test("test spark resource missing amount") {
+    val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")))
+    // test missing count
+    gpuResources.foreach { case (_, testRInfo) =>
+      baseConf.set(
+        s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
+        testRInfo.vendor)
+    }
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    val error = intercept[SparkException] {
+      val executor = step.configurePod(SparkPod.initialPod())
+    }.getMessage()
+    assert(error.contains("Resource: gpu was requested, but count was not specified"))
+  }
+
+  test("basic executor pod with resources") {
+    val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")),
+      ("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com")))
+    gpuResources.foreach { case (_, testRInfo) =>
+      baseConf.set(
+        s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
+        testRInfo.count)
+      baseConf.set(
+        s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
+        testRInfo.vendor)
+    }
+    val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    assert(executor.container.getResources.getLimits.size() === 3)
+    assert(executor.container.getResources
+      .getLimits.get("memory").getAmount === "1408Mi")
+    gpuResources.foreach { case (k8sName, testRInfo) =>
+      assert(executor.container.getResources.getLimits.get(k8sName).getAmount === testRInfo.count)
+    }
+  }
+
   test("basic executor pod has reasonable defaults") {
     val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
     val executor = step.configurePod(SparkPod.initialPod())
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
index b0604ea..e8be3b0 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -66,4 +66,6 @@ object KubernetesFeaturesTestUtils {
     val desired = implicitly[ClassTag[T]].runtimeClass
     list.filter(_.getClass() == desired).map(_.asInstanceOf[T])
   }
+
+  case class TestResourceInformation(rName: String, count: String, vendor: String)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org