You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/01 18:43:04 UTC
[spark] branch master updated: [SPARK-38188][K8S] Support `spark.kubernetes.job.queue`
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ccb8af6 [SPARK-38188][K8S] Support `spark.kubernetes.job.queue`
ccb8af6 is described below
commit ccb8af607672d2b2638554a1d4b003420292c0b2
Author: Yikun Jiang <yi...@gmail.com>
AuthorDate: Tue Mar 1 10:41:27 2022 -0800
[SPARK-38188][K8S] Support `spark.kubernetes.job.queue`
### What changes were proposed in this pull request?
This patch has below changes:
- Add the `queue` configuration `spark.kubernetes.job.queue`
- Add queue scheduling Volcano implementions
- Add a integrations test to make sure `queue` is set to PodGroup, and also validate the queue scheduling.
### Why are the changes needed?
Support queue scheduling with Volcano implementations.
### Does this PR introduce _any_ user-facing change?
Yes, introduce a new configuration `spark.kubernetes.job.queue`.
### How was this patch tested?
- UT passed.
- Spark on Kubernetes Integration test passed.
Closes #35553 from Yikun/SPARK-38188.
Authored-by: Yikun Jiang <yi...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
docs/running-on-kubernetes.md | 9 ++
.../scala/org/apache/spark/deploy/k8s/Config.scala | 7 +
.../deploy/k8s/features/VolcanoFeatureStep.scala | 8 +-
.../k8s/features/VolcanoFeatureStepSuite.scala | 11 ++
.../volcano/disable-queue0-enable-queue1.yml | 29 +++++
.../volcano/enable-queue0-enable-queue1.yml | 29 +++++
.../k8s/integrationtest/KubernetesSuite.scala | 19 ++-
.../k8s/integrationtest/VolcanoTestsSuite.scala | 142 ++++++++++++++++++++-
8 files changed, 242 insertions(+), 12 deletions(-)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index ee0b230..8553d78 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1357,6 +1357,15 @@ See the [configuration page](configuration.html) for information on Spark config
<td>3.3.0</td>
</tr>
<tr>
+ <td><code>spark.kubernetes.job.queue</code></td>
+ <td>(none)</td>
+ <td>
+ The name of the queue to which the job is submitted. This info will be stored in configuration
+ and passed to specific feature step.
+ </td>
+ <td>3.3.0</td>
+</tr>
+<tr>
<td><code>spark.kubernetes.configMap.maxSize</code></td>
<td><code>1572864</code></td>
<td>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 91bbb41..58a4a78 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -292,6 +292,13 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
+ val KUBERNETES_JOB_QUEUE = ConfigBuilder("spark.kubernetes.job.queue")
+ .doc("The name of the queue to which the job is submitted. This info " +
+ "will be stored in configuration and passed to specific feature step.")
+ .version("3.3.0")
+ .stringConf
+ .createOptional
+
val KUBERNETES_EXECUTOR_REQUEST_CORES =
ConfigBuilder("spark.kubernetes.executor.request.cores")
.doc("Specify the cpu request for each executor pod")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
index 1c93684..c6efe4d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
@@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model._
import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
with KubernetesExecutorCustomFeatureConfigStep {
@@ -30,6 +31,7 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup"
private lazy val namespace = kubernetesConf.namespace
+ private lazy val queue = kubernetesConf.get(KUBERNETES_JOB_QUEUE)
override def init(config: KubernetesDriverConf): Unit = {
kubernetesConf = config
@@ -45,8 +47,10 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
.withName(podGroupName)
.withNamespace(namespace)
.endMetadata()
- .build()
- Seq(podGroup)
+
+ queue.foreach(podGroup.editOrNewSpec().withQueue(_).endSpec())
+
+ Seq(podGroup.build())
}
override def configurePod(pod: SparkPod): SparkPod = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
index cf337f9..eda1ccc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
@@ -20,6 +20,7 @@ import io.fabric8.volcano.scheduling.v1beta1.PodGroup
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
class VolcanoFeatureStepSuite extends SparkFunSuite {
@@ -37,6 +38,16 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
}
+ test("SPARK-38818: Support `spark.kubernetes.job.queue`") {
+ val sparkConf = new SparkConf()
+ .set(KUBERNETES_JOB_QUEUE.key, "queue1")
+ val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
+ val step = new VolcanoFeatureStep()
+ step.init(kubernetesConf)
+ val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
+ assert(podGroup.getSpec.getQueue === "queue1")
+ }
+
test("SPARK-36061: Executor Pod with Volcano PodGroup") {
val sparkConf = new SparkConf()
val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue0-enable-queue1.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue0-enable-queue1.yml
new file mode 100644
index 0000000..2281e2e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue0-enable-queue1.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+ name: queue0
+spec:
+ weight: 0
+---
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+ name: queue1
+spec:
+ weight: 1
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue0-enable-queue1.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue0-enable-queue1.yml
new file mode 100644
index 0000000..aadeb28
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue0-enable-queue1.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+ name: queue0
+spec:
+ weight: 1
+---
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: Queue
+metadata:
+ name: queue1
+spec:
+ weight: 1
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 69b7369..ca7eae1 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -212,7 +212,9 @@ class KubernetesSuite extends SparkFunSuite
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String] = Array.empty[String],
- isJVM: Boolean = true ): Unit = {
+ isJVM: Boolean = true,
+ customSparkConf: Option[SparkAppConf] = None,
+ customAppLocator: Option[String] = None): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
@@ -221,7 +223,10 @@ class KubernetesSuite extends SparkFunSuite
appArgs,
driverPodChecker,
executorPodChecker,
- isJVM)
+ isJVM,
+ customSparkConf = customSparkConf,
+ customAppLocator = customAppLocator
+ )
}
protected def runDFSReadWriteAndVerifyCompletion(
@@ -336,7 +341,9 @@ class KubernetesSuite extends SparkFunSuite
pyFiles: Option[String] = None,
executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
decommissioningTest: Boolean = false,
- env: Map[String, String] = Map.empty[String, String]): Unit = {
+ env: Map[String, String] = Map.empty[String, String],
+ customSparkConf: Option[SparkAppConf] = None,
+ customAppLocator: Option[String] = None): Unit = {
// scalastyle:on argcount
val appArguments = SparkAppArguments(
@@ -370,7 +377,7 @@ class KubernetesSuite extends SparkFunSuite
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
- .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
logDebug("Beginning watch of executors")
@@ -434,7 +441,7 @@ class KubernetesSuite extends SparkFunSuite
logDebug("Starting Spark K8s job")
SparkAppLauncher.launch(
appArguments,
- sparkAppConf,
+ customSparkConf.getOrElse(sparkAppConf),
TIMEOUT.value.toSeconds.toInt,
sparkHomeDir,
isJVM,
@@ -443,7 +450,7 @@ class KubernetesSuite extends SparkFunSuite
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
- .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
.withLabel("spark-role", "driver")
.list()
.getItems
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
index 377a1b8..7ffd28b 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
@@ -16,16 +16,34 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
+import java.io.{File, FileInputStream}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+// scalastyle:off executioncontextglobal
+import scala.concurrent.ExecutionContext.Implicits.global
+// scalastyle:on executioncontextglobal
+import scala.concurrent.Future
+
import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.volcano.client.VolcanoClient
+import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
-import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
-import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
+import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED
private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
import VolcanoTestsSuite._
+ import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
+ import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT}
+
+ lazy val volcanoClient: VolcanoClient
+ = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+ lazy val k8sClient: NamespacedKubernetesClient = kubernetesTestComponents.kubernetesClient
protected def checkScheduler(pod: Pod): Unit = {
assert(pod.getSpec.getSchedulerName === "volcano")
@@ -37,12 +55,81 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
}
- protected def checkPodGroup(pod: Pod): Unit = {
+ protected def checkPodGroup(
+ pod: Pod,
+ queue: Option[String] = None): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val podGroupName = s"$appId-podgroup"
- val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
+ queue.foreach(q => assert(q === podGroup.getSpec.getQueue))
+ }
+
+ private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
+ k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
+ }
+
+ private def deleteYAMLResource(yamlPath: String): Unit = {
+ k8sClient.load(new FileInputStream(yamlPath)).delete()
+ }
+
+ private def getPods(
+ role: String,
+ groupLocator: String,
+ statusPhase: String): mutable.Buffer[Pod] = {
+ k8sClient
+ .pods()
+ .withLabel("spark-group-locator", groupLocator)
+ .withLabel("spark-role", role)
+ .withField("status.phase", statusPhase)
+ .list()
+ .getItems.asScala
+ }
+
+ def runJobAndVerify(
+ batchSuffix: String,
+ groupLoc: Option[String] = None,
+ queue: Option[String] = None): Unit = {
+ val appLoc = s"${appLocator}${batchSuffix}"
+ val podName = s"${driverPodName}-${batchSuffix}"
+ // create new configuration for every job
+ val conf = createVolcanoSparkConf(podName, appLoc, groupLoc, queue)
+ runSparkPiAndVerifyCompletion(
+ driverPodChecker = (driverPod: Pod) => {
+ checkScheduler(driverPod)
+ checkAnnotaion(driverPod)
+ checkPodGroup(driverPod, queue)
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ checkScheduler(executorPod)
+ checkAnnotaion(executorPod)
+ },
+ customSparkConf = Option(conf),
+ customAppLocator = Option(appLoc)
+ )
+ }
+
+ private def createVolcanoSparkConf(
+ driverPodName: String = driverPodName,
+ appLoc: String = appLocator,
+ groupLoc: Option[String] = None,
+ queue: Option[String] = None): SparkAppConf = {
+ val conf = kubernetesTestComponents.newSparkAppConf()
+ .set(CONTAINER_IMAGE.key, image)
+ .set(KUBERNETES_DRIVER_POD_NAME.key, driverPodName)
+ .set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-app-locator", appLoc)
+ .set(s"${KUBERNETES_EXECUTOR_LABEL_PREFIX}spark-app-locator", appLoc)
+ .set(NETWORK_AUTH_ENABLED.key, "true")
+ // below is volcano specific configuration
+ .set(KUBERNETES_SCHEDULER_NAME.key, "volcano")
+ .set(KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
+ .set(KUBERNETES_EXECUTOR_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
+ queue.foreach(conf.set(KUBERNETES_JOB_QUEUE.key, _))
+ groupLoc.foreach { locator =>
+ conf.set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-group-locator", locator)
+ conf.set(s"${KUBERNETES_EXECUTOR_LABEL_PREFIX}spark-group-locator", locator)
+ }
+ conf
}
test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) {
@@ -63,8 +150,55 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
}
)
}
+
+ test("SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enable)", k8sTestTag, volcanoTag) {
+ // Disabled queue0 and enabled queue1
+ createOrReplaceYAMLResource(VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML)
+ // Submit jobs into disabled queue0 and enabled queue1
+ val jobNum = 4
+ (1 to jobNum).foreach { i =>
+ Future {
+ val queueName = s"queue${i % 2}"
+ runJobAndVerify(i.toString, Option(s"$GROUP_PREFIX-$queueName"), Option(queueName))
+ }
+ }
+ // There are two `Succeeded` jobs and two `Pending` jobs
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ val completedPods = getPods("driver", s"$GROUP_PREFIX-queue1", "Succeeded")
+ assert(completedPods.size === 2)
+ val pendingPods = getPods("driver", s"$GROUP_PREFIX-queue0", "Pending")
+ assert(pendingPods.size === 2)
+ }
+ deleteYAMLResource(VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML)
+ }
+
+ test("SPARK-38188: Run SparkPi jobs with 2 queues (all enable)", k8sTestTag, volcanoTag) {
+ // Enable all queues
+ createOrReplaceYAMLResource(VOLCANO_ENABLE_Q0_AND_Q1_YAML)
+ val jobNum = 4
+ // Submit jobs into these two queues
+ (1 to jobNum).foreach { i =>
+ Future {
+ val queueName = s"queue${i % 2}"
+ runJobAndVerify(i.toString, Option(s"$GROUP_PREFIX"), Option(queueName))
+ }
+ }
+ // All jobs "Succeeded"
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ val completedPods = getPods("driver", GROUP_PREFIX, "Succeeded")
+ assert(completedPods.size === jobNum)
+ }
+ deleteYAMLResource(VOLCANO_ENABLE_Q0_AND_Q1_YAML)
+ }
}
private[spark] object VolcanoTestsSuite extends SparkFunSuite {
val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
+ val VOLCANO_ENABLE_Q0_AND_Q1_YAML = new File(
+ getClass.getResource("/volcano/enable-queue0-enable-queue1.yml").getFile
+ ).getAbsolutePath
+ val VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML = new File(
+ getClass.getResource("/volcano/disable-queue0-enable-queue1.yml").getFile
+ ).getAbsolutePath
+ val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org