You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by ma...@apache.org on 2023/09/07 05:47:23 UTC
[yunikorn-k8shim] branch master updated: [YUNIKORN-1935] add gang scheduling with priority test (#659)
This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 358751a6 [YUNIKORN-1935] add gang scheduling with priority test (#659)
358751a6 is described below
commit 358751a6dd3c7fd4489d301bbcdb3292397c3363
Author: PoAn Yang <pa...@apache.org>
AuthorDate: Thu Sep 7 11:17:09 2023 +0530
[YUNIKORN-1935] add gang scheduling with priority test (#659)
Closes: #659
Signed-off-by: Manikandan R <ma...@gmail.com>
---
.../priority_scheduling_test.go | 192 ++++++++++++++++++++-
1 file changed, 185 insertions(+), 7 deletions(-)
diff --git a/test/e2e/priority_scheduling/priority_scheduling_test.go b/test/e2e/priority_scheduling/priority_scheduling_test.go
index 6514090b..40dd8ce1 100644
--- a/test/e2e/priority_scheduling/priority_scheduling_test.go
+++ b/test/e2e/priority_scheduling/priority_scheduling_test.go
@@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
tests "github.com/apache/yunikorn-k8shim/test/e2e"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
@@ -41,15 +42,17 @@ const (
requestMem = "100M"
)
-var rr = &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- v1.ResourceCPU: resource.MustParse(requestCPU),
- v1.ResourceMemory: resource.MustParse(requestMem),
- },
-}
+var (
+ ns string
+ rr = &v1.ResourceRequirements{
+ Requests: v1.ResourceList{
+ v1.ResourceCPU: resource.MustParse(requestCPU),
+ v1.ResourceMemory: resource.MustParse(requestMem),
+ },
+ }
+)
var _ = ginkgo.Describe("PriorityScheduling", func() {
- var ns string
var namespace *v1.Namespace
var err error
var oldConfigMap = new(v1.ConfigMap)
@@ -251,6 +254,158 @@ var _ = ginkgo.Describe("PriorityScheduling", func() {
validatePodSchedulingOrder(ns, sleepPodConf, lowPodConf, normalPodConf, highPodConf)
})
+ ginkgo.It("Verify_Gang_Scheduling_With_Priority", func() {
+ By("Setting custom YuniKorn configuration")
+ annotation = "ann-" + common.RandSeq(10)
+ yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fifo", annotation, func(sc *configs.SchedulerConfig) error {
+ // remove placement rules so we can control queue
+ sc.Partitions[0].PlacementRules = nil
+
+ if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
+ Name: "default",
+ Parent: false,
+ Resources: configs.Resources{Max: map[string]string{siCommon.CPU: "100m", siCommon.Memory: "100M"}},
+ }); err != nil {
+ return err
+ }
+
+ return nil
+ })
+ sleepPodConf = k8s.TestPodConfig{
+ Name: "test-sleep-" + common.RandSeq(5),
+ Labels: map[string]string{
+ constants.LabelQueueName: "root.default",
+ constants.LabelApplicationID: "app-sleep-" + common.RandSeq(5)},
+ Namespace: ns,
+ Resources: rr,
+ }
+
+ taskGroupMinResource := map[string]resource.Quantity{}
+ for k, v := range rr.Requests {
+ taskGroupMinResource[k.String()] = v
+ }
+ lowPodConf = createPodConfWithTaskGroup("low", lowPriorityClass.Name, taskGroupMinResource)
+ normalPodConf = createPodConfWithTaskGroup("normal", normalPriorityClass.Name, taskGroupMinResource)
+ highPodConf = createPodConfWithTaskGroup("high", highPriorityClass.Name, taskGroupMinResource)
+
+ var sleepPod, lowPod, normalPod, highPod *v1.Pod
+ By("Create sleep pod to consume queue")
+ sleepPod, err = k8s.InitTestPod(sleepPodConf)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ sleepPod, err = kubeClient.CreatePod(sleepPod, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ err = kubeClient.WaitForPodRunning(ns, sleepPod.Name, 1*time.Minute)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Submit low priority job")
+ lowPod, err = k8s.InitTestPod(lowPodConf)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ lowJob := k8s.InitTestJob(lowPod.Labels[constants.LabelApplicationID], 1, 1, lowPod)
+ lowJob, err = kubeClient.CreateJob(lowJob, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ err = kubeClient.WaitForJobPodsCreated(ns, lowJob.Name, 1, 30*time.Second)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Submit normal priority job")
+ normalPod, err = k8s.InitTestPod(normalPodConf)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ normalJob := k8s.InitTestJob(normalPod.Labels[constants.LabelApplicationID], 1, 1, normalPod)
+ normalJob, err = kubeClient.CreateJob(normalJob, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ err = kubeClient.WaitForJobPodsCreated(ns, normalJob.Name, 1, 30*time.Second)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Submit high priority job")
+ highPod, err = k8s.InitTestPod(highPodConf)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ highJob := k8s.InitTestJob(highPod.Labels[constants.LabelApplicationID], 1, 1, highPod)
+ highJob, err = kubeClient.CreateJob(highJob, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ err = kubeClient.WaitForJobPodsCreated(ns, highJob.Name, 1, 30*time.Second)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Wait for scheduler state to settle")
+ time.Sleep(10 * time.Second)
+
+ var lowPods, normalPods, highPods *v1.PodList
+ lowPods, err = kubeClient.ListPods(ns, fmt.Sprintf("job-name=%s", lowJob.Name))
+ Ω(err).NotTo(gomega.HaveOccurred())
+ lowPod = &lowPods.Items[0]
+
+ normalPods, err = kubeClient.ListPods(ns, fmt.Sprintf("job-name=%s", normalJob.Name))
+ Ω(err).NotTo(gomega.HaveOccurred())
+ normalPod = &normalPods.Items[0]
+
+ highPods, err = kubeClient.ListPods(ns, fmt.Sprintf("job-name=%s", highJob.Name))
+ Ω(err).NotTo(gomega.HaveOccurred())
+ highPod = &highPods.Items[0]
+
+ By("Ensure no test pods are running")
+ ensureNotRunning(ns, lowPod, normalPod, highPod)
+
+ By("Kill sleep pod to make room for test pods")
+ err = kubeClient.DeletePod(sleepPod.Name, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Wait for high-priority placeholders terminated")
+ var tgPlaceHolders map[string][]string
+ tgPlaceHolders = yunikorn.GetPlaceholderNames(highPodConf.Annotations, highPodConf.Labels["applicationId"])
+ for _, phNames := range tgPlaceHolders {
+ for _, ph := range phNames {
+ phTermErr := kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+ Ω(phTermErr).NotTo(HaveOccurred())
+ }
+ }
+
+ By("Wait for high-priority pod to begin running")
+ err = kubeClient.WaitForPodRunning(ns, highPod.Name, 1*time.Minute)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Ensure low and normal priority pods are not running")
+ ensureNotRunning(ns, lowPod, normalPod)
+
+ By("Kill high-priority job")
+ err = kubeClient.DeleteJob(highJob.Name, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Wait for normal-priority placeholders terminated")
+ tgPlaceHolders = yunikorn.GetPlaceholderNames(normalPodConf.Annotations, normalPodConf.Labels["applicationId"])
+ for _, phNames := range tgPlaceHolders {
+ for _, ph := range phNames {
+ phTermErr := kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+ Ω(phTermErr).NotTo(HaveOccurred())
+ }
+ }
+
+ By("Wait for normal-priority pod to begin running")
+ err = kubeClient.WaitForPodRunning(ns, normalPod.Name, 1*time.Minute)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Ensure low priority pod is not running")
+ ensureNotRunning(ns, lowPod)
+
+ By("Kill normal-priority job")
+ err = kubeClient.DeleteJob(normalJob.Name, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Wait for low-priority placeholders terminated")
+ tgPlaceHolders = yunikorn.GetPlaceholderNames(lowPodConf.Annotations, lowPodConf.Labels["applicationId"])
+ for _, phNames := range tgPlaceHolders {
+ for _, ph := range phNames {
+ phTermErr := kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+ Ω(phTermErr).NotTo(HaveOccurred())
+ }
+ }
+
+ By("Wait for low-priority pod to begin running")
+ err = kubeClient.WaitForPodRunning(ns, lowPod.Name, 1*time.Minute)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ By("Kill low-priority job")
+ err = kubeClient.DeleteJob(lowJob.Name, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ })
+
ginkgo.AfterEach(func() {
testDescription := ginkgo.CurrentSpecReport()
if testDescription.Failed() {
@@ -355,3 +510,26 @@ func ensureNotRunning(ns string, pods ...*v1.Pod) {
Ω(podResult.Status.Phase).ShouldNot(Equal(v1.PodRunning), pod.Name)
}
}
+
+func createPodConfWithTaskGroup(name, priorityClassName string, taskGroupMinResource map[string]resource.Quantity) k8s.TestPodConfig {
+ return k8s.TestPodConfig{
+ Name: fmt.Sprintf("test-%s-priority-%s", name, common.RandSeq(5)),
+ Labels: map[string]string{
+ constants.LabelQueueName: "root.default",
+ constants.LabelApplicationID: fmt.Sprintf("app-%s-%s", name, common.RandSeq(5))},
+ Namespace: ns,
+ Annotations: &k8s.PodAnnotation{
+ TaskGroupName: "group-" + name,
+ TaskGroups: []interfaces.TaskGroup{
+ {
+ Name: "group-" + name,
+ MinMember: int32(1),
+ MinResource: taskGroupMinResource,
+ },
+ },
+ },
+ Resources: rr,
+ PriorityClassName: priorityClassName,
+ RestartPolicy: v1.RestartPolicyNever,
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org