You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/01 18:43:04 UTC

[spark] branch master updated: [SPARK-38188][K8S] Support `spark.kubernetes.job.queue`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb8af6  [SPARK-38188][K8S] Support `spark.kubernetes.job.queue`
ccb8af6 is described below

commit ccb8af607672d2b2638554a1d4b003420292c0b2
Author: Yikun Jiang <yi...@gmail.com>
AuthorDate: Tue Mar 1 10:41:27 2022 -0800

    [SPARK-38188][K8S] Support `spark.kubernetes.job.queue`
    
    ### What changes were proposed in this pull request?
    This patch has below changes:
    - Add the `queue` configuration `spark.kubernetes.job.queue`
    - Add queue scheduling Volcano implementions
    - Add a integrations test to make sure `queue` is set to PodGroup, and also validate the queue scheduling.
    
    ### Why are the changes needed?
    Support queue scheduling with Volcano implementations.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, introduce a new configuration `spark.kubernetes.job.queue`.
    
    ### How was this patch tested?
    - UT passed.
    - Spark on Kubernetes Integration test passed.
    
    Closes #35553 from Yikun/SPARK-38188.
    
    Authored-by: Yikun Jiang <yi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 docs/running-on-kubernetes.md                      |   9 ++
 .../scala/org/apache/spark/deploy/k8s/Config.scala |   7 +
 .../deploy/k8s/features/VolcanoFeatureStep.scala   |   8 +-
 .../k8s/features/VolcanoFeatureStepSuite.scala     |  11 ++
 .../volcano/disable-queue0-enable-queue1.yml       |  29 +++++
 .../volcano/enable-queue0-enable-queue1.yml        |  29 +++++
 .../k8s/integrationtest/KubernetesSuite.scala      |  19 ++-
 .../k8s/integrationtest/VolcanoTestsSuite.scala    | 142 ++++++++++++++++++++-
 8 files changed, 242 insertions(+), 12 deletions(-)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index ee0b230..8553d78 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1357,6 +1357,15 @@ See the [configuration page](configuration.html) for information on Spark config
   <td>3.3.0</td>
 </tr>
 <tr>
+  <td><code>spark.kubernetes.job.queue</code></td>
+  <td>(none)</td>
+  <td>
+    The name of the queue to which the job is submitted. This info will be stored in configuration
+    and passed to specific feature step.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
   <td><code>spark.kubernetes.configMap.maxSize</code></td>
   <td><code>1572864</code></td>
   <td>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 91bbb41..58a4a78 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -292,6 +292,13 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_JOB_QUEUE = ConfigBuilder("spark.kubernetes.job.queue")
+    .doc("The name of the queue to which the job is submitted. This info " +
+      "will be stored in configuration and passed to specific feature step.")
+    .version("3.3.0")
+    .stringConf
+    .createOptional
+
   val KUBERNETES_EXECUTOR_REQUEST_CORES =
     ConfigBuilder("spark.kubernetes.executor.request.cores")
       .doc("Specify the cpu request for each executor pod")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
index 1c93684..c6efe4d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
@@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model._
 import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
 
 private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
   with KubernetesExecutorCustomFeatureConfigStep {
@@ -30,6 +31,7 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
 
   private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup"
   private lazy val namespace = kubernetesConf.namespace
+  private lazy val queue = kubernetesConf.get(KUBERNETES_JOB_QUEUE)
 
   override def init(config: KubernetesDriverConf): Unit = {
     kubernetesConf = config
@@ -45,8 +47,10 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
         .withName(podGroupName)
         .withNamespace(namespace)
       .endMetadata()
-      .build()
-    Seq(podGroup)
+
+    queue.foreach(podGroup.editOrNewSpec().withQueue(_).endSpec())
+
+    Seq(podGroup.build())
   }
 
   override def configurePod(pod: SparkPod): SparkPod = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
index cf337f9..eda1ccc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
@@ -20,6 +20,7 @@ import io.fabric8.volcano.scheduling.v1beta1.PodGroup
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
 
 class VolcanoFeatureStepSuite extends SparkFunSuite {
 
@@ -37,6 +38,16 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
     assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
   }
 
+  test("SPARK-38818: Support `spark.kubernetes.job.queue`") {
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_JOB_QUEUE.key, "queue1")
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
+    val step = new VolcanoFeatureStep()
+    step.init(kubernetesConf)
+    val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
+    assert(podGroup.getSpec.getQueue === "queue1")
+  }
+
   test("SPARK-36061: Executor Pod with Volcano PodGroup") {
     val sparkConf = new SparkConf()
     val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue0-enable-queue1.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue0-enable-queue1.yml
new file mode 100644
index 0000000..2281e2e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue0-enable-queue1.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+  name: queue0
+spec:
+  weight: 0
+---
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+  name: queue1
+spec:
+  weight: 1
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue0-enable-queue1.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue0-enable-queue1.yml
new file mode 100644
index 0000000..aadeb28
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue0-enable-queue1.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+  name: queue0
+spec:
+  weight: 1
+---
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+  name: queue1
+spec:
+  weight: 1
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 69b7369..ca7eae1 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -212,7 +212,9 @@ class KubernetesSuite extends SparkFunSuite
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
       executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
       appArgs: Array[String] = Array.empty[String],
-      isJVM: Boolean = true ): Unit = {
+      isJVM: Boolean = true,
+      customSparkConf: Option[SparkAppConf] = None,
+      customAppLocator: Option[String] = None): Unit = {
     runSparkApplicationAndVerifyCompletion(
       appResource,
       SPARK_PI_MAIN_CLASS,
@@ -221,7 +223,10 @@ class KubernetesSuite extends SparkFunSuite
       appArgs,
       driverPodChecker,
       executorPodChecker,
-      isJVM)
+      isJVM,
+      customSparkConf = customSparkConf,
+      customAppLocator = customAppLocator
+    )
   }
 
   protected def runDFSReadWriteAndVerifyCompletion(
@@ -336,7 +341,9 @@ class KubernetesSuite extends SparkFunSuite
       pyFiles: Option[String] = None,
       executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
       decommissioningTest: Boolean = false,
-      env: Map[String, String] = Map.empty[String, String]): Unit = {
+      env: Map[String, String] = Map.empty[String, String],
+      customSparkConf: Option[SparkAppConf] = None,
+      customAppLocator: Option[String] = None): Unit = {
 
   // scalastyle:on argcount
     val appArguments = SparkAppArguments(
@@ -370,7 +377,7 @@ class KubernetesSuite extends SparkFunSuite
 
     val execWatcher = kubernetesTestComponents.kubernetesClient
       .pods()
-      .withLabel("spark-app-locator", appLocator)
+      .withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
       .withLabel("spark-role", "executor")
       .watch(new Watcher[Pod] {
         logDebug("Beginning watch of executors")
@@ -434,7 +441,7 @@ class KubernetesSuite extends SparkFunSuite
     logDebug("Starting Spark K8s job")
     SparkAppLauncher.launch(
       appArguments,
-      sparkAppConf,
+      customSparkConf.getOrElse(sparkAppConf),
       TIMEOUT.value.toSeconds.toInt,
       sparkHomeDir,
       isJVM,
@@ -443,7 +450,7 @@ class KubernetesSuite extends SparkFunSuite
 
     val driverPod = kubernetesTestComponents.kubernetesClient
       .pods()
-      .withLabel("spark-app-locator", appLocator)
+      .withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
       .withLabel("spark-role", "driver")
       .list()
       .getItems
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
index 377a1b8..7ffd28b 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
@@ -16,16 +16,34 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
+import java.io.{File, FileInputStream}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+// scalastyle:off executioncontextglobal
+import scala.concurrent.ExecutionContext.Implicits.global
+// scalastyle:on executioncontextglobal
+import scala.concurrent.Future
+
 import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient
 import io.fabric8.volcano.client.VolcanoClient
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
-import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
-import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
+import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED
 
 private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
   import VolcanoTestsSuite._
+  import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
+  import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT}
+
+  lazy val volcanoClient: VolcanoClient
+    = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+  lazy val k8sClient: NamespacedKubernetesClient = kubernetesTestComponents.kubernetesClient
 
   protected def checkScheduler(pod: Pod): Unit = {
     assert(pod.getSpec.getSchedulerName === "volcano")
@@ -37,12 +55,81 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
     assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
   }
 
-  protected def checkPodGroup(pod: Pod): Unit = {
+  protected def checkPodGroup(
+      pod: Pod,
+      queue: Option[String] = None): Unit = {
     val appId = pod.getMetadata.getLabels.get("spark-app-selector")
     val podGroupName = s"$appId-podgroup"
-    val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
     val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
     assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
+    queue.foreach(q => assert(q === podGroup.getSpec.getQueue))
+  }
+
+  private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
+    k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
+  }
+
+  private def deleteYAMLResource(yamlPath: String): Unit = {
+    k8sClient.load(new FileInputStream(yamlPath)).delete()
+  }
+
+  private def getPods(
+      role: String,
+      groupLocator: String,
+      statusPhase: String): mutable.Buffer[Pod] = {
+    k8sClient
+      .pods()
+      .withLabel("spark-group-locator", groupLocator)
+      .withLabel("spark-role", role)
+      .withField("status.phase", statusPhase)
+      .list()
+      .getItems.asScala
+  }
+
+  def runJobAndVerify(
+      batchSuffix: String,
+      groupLoc: Option[String] = None,
+      queue: Option[String] = None): Unit = {
+    val appLoc = s"${appLocator}${batchSuffix}"
+    val podName = s"${driverPodName}-${batchSuffix}"
+    // create new configuration for every job
+    val conf = createVolcanoSparkConf(podName, appLoc, groupLoc, queue)
+    runSparkPiAndVerifyCompletion(
+      driverPodChecker = (driverPod: Pod) => {
+        checkScheduler(driverPod)
+        checkAnnotaion(driverPod)
+        checkPodGroup(driverPod, queue)
+      },
+      executorPodChecker = (executorPod: Pod) => {
+        checkScheduler(executorPod)
+        checkAnnotaion(executorPod)
+      },
+      customSparkConf = Option(conf),
+      customAppLocator = Option(appLoc)
+    )
+  }
+
+  private def createVolcanoSparkConf(
+      driverPodName: String = driverPodName,
+      appLoc: String = appLocator,
+      groupLoc: Option[String] = None,
+      queue: Option[String] = None): SparkAppConf = {
+    val conf = kubernetesTestComponents.newSparkAppConf()
+      .set(CONTAINER_IMAGE.key, image)
+      .set(KUBERNETES_DRIVER_POD_NAME.key, driverPodName)
+      .set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-app-locator", appLoc)
+      .set(s"${KUBERNETES_EXECUTOR_LABEL_PREFIX}spark-app-locator", appLoc)
+      .set(NETWORK_AUTH_ENABLED.key, "true")
+      // below is volcano specific configuration
+      .set(KUBERNETES_SCHEDULER_NAME.key, "volcano")
+      .set(KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
+      .set(KUBERNETES_EXECUTOR_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
+    queue.foreach(conf.set(KUBERNETES_JOB_QUEUE.key, _))
+    groupLoc.foreach { locator =>
+      conf.set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-group-locator", locator)
+      conf.set(s"${KUBERNETES_EXECUTOR_LABEL_PREFIX}spark-group-locator", locator)
+    }
+    conf
   }
 
   test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) {
@@ -63,8 +150,55 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
       }
     )
   }
+
+  test("SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enable)", k8sTestTag, volcanoTag) {
+    // Disabled queue0 and enabled queue1
+    createOrReplaceYAMLResource(VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML)
+    // Submit jobs into disabled queue0 and enabled queue1
+    val jobNum = 4
+    (1 to jobNum).foreach { i =>
+      Future {
+        val queueName = s"queue${i % 2}"
+        runJobAndVerify(i.toString, Option(s"$GROUP_PREFIX-$queueName"), Option(queueName))
+      }
+    }
+    // There are two `Succeeded` jobs and two `Pending` jobs
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      val completedPods = getPods("driver", s"$GROUP_PREFIX-queue1", "Succeeded")
+      assert(completedPods.size === 2)
+      val pendingPods = getPods("driver", s"$GROUP_PREFIX-queue0", "Pending")
+      assert(pendingPods.size === 2)
+    }
+    deleteYAMLResource(VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML)
+  }
+
+  test("SPARK-38188: Run SparkPi jobs with 2 queues (all enable)", k8sTestTag, volcanoTag) {
+    // Enable all queues
+    createOrReplaceYAMLResource(VOLCANO_ENABLE_Q0_AND_Q1_YAML)
+    val jobNum = 4
+    // Submit jobs into these two queues
+    (1 to jobNum).foreach { i =>
+      Future {
+        val queueName = s"queue${i % 2}"
+        runJobAndVerify(i.toString, Option(s"$GROUP_PREFIX"), Option(queueName))
+      }
+    }
+    // All jobs "Succeeded"
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      val completedPods = getPods("driver", GROUP_PREFIX, "Succeeded")
+      assert(completedPods.size === jobNum)
+    }
+    deleteYAMLResource(VOLCANO_ENABLE_Q0_AND_Q1_YAML)
+  }
 }
 
 private[spark] object VolcanoTestsSuite extends SparkFunSuite {
   val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
+  val VOLCANO_ENABLE_Q0_AND_Q1_YAML = new File(
+    getClass.getResource("/volcano/enable-queue0-enable-queue1.yml").getFile
+  ).getAbsolutePath
+  val VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML = new File(
+    getClass.getResource("/volcano/disable-queue0-enable-queue1.yml").getFile
+  ).getAbsolutePath
+  val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "")
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org