You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/18 07:49:24 UTC

[GitHub] [spark] martin-g commented on a change in pull request #35553: [SPARK-38188][K8S] Support queue scheduling with Volcano implementations

martin-g commented on a change in pull request #35553:
URL: https://github.com/apache/spark/pull/35553#discussion_r809739896



##########
File path: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
##########
@@ -212,7 +212,9 @@ class KubernetesSuite extends SparkFunSuite
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
       executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
       appArgs: Array[String] = Array.empty[String],
-      isJVM: Boolean = true ): Unit = {
+      isJVM: Boolean = true,
+      sparkConf: Option[SparkAppConf] = None,
+      appLoc: Option[String] = None): Unit = {

Review comment:
       `appLoc` is not very clear name (at least to me). I had to scroll down to see that it is used for `spark-app-locator`.
   Maybe rename to `appLocatorOverride` ?!
   If you like the idea then maybe the same for `sparkConf` -> `sparkConfOverride`

##########
File path: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
##########
@@ -16,16 +16,33 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
+import java.io.{File, FileInputStream}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent._
+
+import ExecutionContext.Implicits.global
 import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient
 import io.fabric8.volcano.client.VolcanoClient
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
-import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
-import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
+import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED
 
 private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
+  import VolcanoSuite._
   import VolcanoTestsSuite._
+  import KubernetesSuite._
+
+  lazy val volcanoClient: VolcanoClient
+    = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])

Review comment:
       With https://github.com/apache/spark/pull/35555 I suggested to close the KubernetesClient in `cleanUp()`. Is the same needed for `volcanoClient` ?

##########
File path: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
##########
@@ -221,7 +223,10 @@ class KubernetesSuite extends SparkFunSuite
       appArgs,
       driverPodChecker,
       executorPodChecker,
-      isJVM)
+      isJVM,
+      sparkConf = sparkConf,

Review comment:
       nit: no need of the RHS

##########
File path: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
##########
@@ -37,12 +54,81 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
     assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
   }
 
-  protected def checkPodGroup(pod: Pod): Unit = {
+  protected def checkPodGroup(
+      pod: Pod,
+      queue: Option[String] = None): Unit = {
     val appId = pod.getMetadata.getLabels.get("spark-app-selector")
     val podGroupName = s"$appId-podgroup"
-    val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
     val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
     assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
+    val spec = podGroup.getSpec
+    if (queue.isDefined) assert(spec.getQueue === queue.get)
+  }
+
+  private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
+    k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
+  }
+
+  private def deleteYAMLResource(yamlPath: String): Unit = {
+    k8sClient.load(new FileInputStream(yamlPath)).delete()
+  }
+
+  private def getPods(role: String, groupLoc: String, statusPhase: String): mutable.Buffer[Pod] = {

Review comment:
       s/groupLoc/groupLocator/ ?

##########
File path: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
##########
@@ -37,12 +54,81 @@ private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
     assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
   }
 
-  protected def checkPodGroup(pod: Pod): Unit = {
+  protected def checkPodGroup(
+      pod: Pod,
+      queue: Option[String] = None): Unit = {
     val appId = pod.getMetadata.getLabels.get("spark-app-selector")
     val podGroupName = s"$appId-podgroup"
-    val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
     val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
     assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
+    val spec = podGroup.getSpec
+    if (queue.isDefined) assert(spec.getQueue === queue.get)
+  }
+
+  private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
+    k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
+  }
+
+  private def deleteYAMLResource(yamlPath: String): Unit = {
+    k8sClient.load(new FileInputStream(yamlPath)).delete()
+  }
+
+  private def getPods(role: String, groupLoc: String, statusPhase: String): mutable.Buffer[Pod] = {
+    k8sClient
+      .pods()
+      .withLabel("spark-group-locator", groupLoc)
+      .withLabel("spark-role", role)
+      .withField("status.phase", statusPhase)
+      .list()
+      .getItems.asScala
+  }
+
+  def runJobAndVerify(
+      batchSuffix: String,
+      groupLoc: Option[String] = None,
+      queue: Option[String] = None): Unit = {
+    val appLoc = s"${appLocator}${batchSuffix}"
+    val podName = s"${driverPodName}-${batchSuffix}"
+    // create new configuration for every job
+    val conf = createVolcanoSparkConf(
+      driverPodName = podName, appLoc = appLoc, groupLoc = groupLoc, queue

Review comment:
       No need of the RHSs (`= appLoc`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org