You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/10/14 06:47:56 UTC

[spark] branch master updated: [SPARK-36059][K8S] Support `spark.kubernetes.driver.scheduler.name`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a722dc  [SPARK-36059][K8S] Support `spark.kubernetes.driver.scheduler.name`
4a722dc is described below

commit 4a722dcdd88e3969532d87f83949bdda55082d60
Author: Yikun Jiang <yi...@gmail.com>
AuthorDate: Wed Oct 13 23:47:14 2021 -0700

    [SPARK-36059][K8S] Support `spark.kubernetes.driver.scheduler.name`
    
    ### What changes were proposed in this pull request?
    This patch adds the support driver for selecting scheduler through schedulerName.
    
    ### Why are the changes needed?
    We have added the scheduler specified ability in executor side, https://github.com/apache/spark/pull/26088. And in some scenarios, users want to specify the driver scheduler to make sure driverPod can be scheduled separately.
    
    Part of [SPARK-36057](https://issues.apache.org/jira/browse/SPARK-36057) .
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, add `spark.kubernetes.driver.scheduler.name` conf
    
    ### How was this patch tested?
    - UT
    
    Closes #34239 from Yikun/SPARK-36059-sch.
    
    Authored-by: Yikun Jiang <yi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 docs/running-on-kubernetes.md                            | 16 ++++++++++++++++
 .../main/scala/org/apache/spark/deploy/k8s/Config.scala  |  7 +++++++
 .../org/apache/spark/deploy/k8s/KubernetesConf.scala     |  5 +++++
 .../deploy/k8s/features/BasicDriverFeatureStep.scala     |  3 +++
 .../apache/spark/deploy/k8s/KubernetesConfSuite.scala    | 15 +++++++++++++++
 5 files changed, 46 insertions(+)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index b30d61d..d32861b 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1306,6 +1306,22 @@ See the [configuration page](configuration.html) for information on Spark config
   </td>
   <td>3.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.executor.scheduler.name<code></td>
+  <td>(none)</td>
+  <td>
+	Specify the scheduler name for each executor pod.
+  </td>
+  <td>3.0.0</td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.driver.scheduler.name<code></td>
+  <td>(none)</td>
+  <td>
+    Specify the scheduler name for driver pod.
+  </td>
+  <td>3.3.0</td>
+</tr>
 </table>
 
 #### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 2aa4fbc..2458e2d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -228,6 +228,13 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_DRIVER_SCHEDULER_NAME =
+    ConfigBuilder("spark.kubernetes.driver.scheduler.name")
+      .doc("Specify the scheduler name for driver pod")
+      .version("3.3.0")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_EXECUTOR_REQUEST_CORES =
     ConfigBuilder("spark.kubernetes.executor.request.cores")
       .doc("Specify the cpu request for each executor pod")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 8f84555..0eef6e1 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -41,6 +41,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
   def secretEnvNamesToKeyRefs: Map[String, String]
   def secretNamesToMountPaths: Map[String, String]
   def volumes: Seq[KubernetesVolumeSpec]
+  def schedulerName: String
 
   def appName: String = get("spark.app.name", "spark")
 
@@ -130,6 +131,8 @@ private[spark] class KubernetesDriverConf(
   override def volumes: Seq[KubernetesVolumeSpec] = {
     KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX)
   }
+
+  override def schedulerName: String = get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse("")
 }
 
 private[spark] class KubernetesExecutorConf(
@@ -186,6 +189,8 @@ private[spark] class KubernetesExecutorConf(
     KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
   }
 
+  override def schedulerName: String = get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse("")
+
   private def checkExecutorEnvKey(key: String): Boolean = {
     // Pattern for matching an executorEnv key, which meets certain naming rules.
     val executorEnvRegex = "[-._a-zA-Z][-._a-zA-Z0-9]*".r
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 90cdf71..925f9dc 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -152,6 +152,9 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
         .endSpec()
       .build()
 
+    conf.get(KUBERNETES_DRIVER_SCHEDULER_NAME)
+      .foreach(driverPod.getSpec.setSchedulerName)
+
     SparkPod(driverPod, driverContainer)
   }
 
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 8a1d0a3..f5cfd8e 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -198,4 +198,19 @@ class KubernetesConfSuite extends SparkFunSuite {
     assert(driverConf.nodeSelector === CUSTOM_NODE_SELECTOR)
     assert(driverConf.driverNodeSelector === CUSTOM_DRIVER_NODE_SELECTOR)
   }
+
+  test("SPARK-36059: Set driver.scheduler and executor.scheduler") {
+    val sparkConf = new SparkConf(false)
+    val execUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf)
+    val driverUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf)
+    assert(execUnsetConf.schedulerName === "")
+    assert(driverUnsetConf.schedulerName === "")
+
+    sparkConf.set(KUBERNETES_DRIVER_SCHEDULER_NAME, "driverScheduler")
+    sparkConf.set(KUBERNETES_EXECUTOR_SCHEDULER_NAME, "executorScheduler")
+    val execConf = KubernetesTestConf.createExecutorConf(sparkConf)
+    assert(execConf.schedulerName === "executorScheduler")
+    val driverConf = KubernetesTestConf.createDriverConf(sparkConf)
+    assert(driverConf.schedulerName === "driverScheduler")
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org