You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by ma...@apache.org on 2023/09/07 05:47:23 UTC

[yunikorn-k8shim] branch master updated: [YUNIKORN-1935] add gang scheduling with priority test (#659)

This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 358751a6 [YUNIKORN-1935] add gang scheduling with priority test (#659)
358751a6 is described below

commit 358751a6dd3c7fd4489d301bbcdb3292397c3363
Author: PoAn Yang <pa...@apache.org>
AuthorDate: Thu Sep 7 11:17:09 2023 +0530

    [YUNIKORN-1935] add gang scheduling with priority test (#659)
    
    Closes: #659
    
    Signed-off-by: Manikandan R <ma...@gmail.com>
---
 .../priority_scheduling_test.go                    | 192 ++++++++++++++++++++-
 1 file changed, 185 insertions(+), 7 deletions(-)

diff --git a/test/e2e/priority_scheduling/priority_scheduling_test.go b/test/e2e/priority_scheduling/priority_scheduling_test.go
index 6514090b..40dd8ce1 100644
--- a/test/e2e/priority_scheduling/priority_scheduling_test.go
+++ b/test/e2e/priority_scheduling/priority_scheduling_test.go
@@ -28,6 +28,7 @@ import (
 	"k8s.io/apimachinery/pkg/api/resource"
 
 	"github.com/apache/yunikorn-core/pkg/common/configs"
+	"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
 	"github.com/apache/yunikorn-k8shim/pkg/common/constants"
 	tests "github.com/apache/yunikorn-k8shim/test/e2e"
 	"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
@@ -41,15 +42,17 @@ const (
 	requestMem = "100M"
 )
 
-var rr = &v1.ResourceRequirements{
-	Requests: v1.ResourceList{
-		v1.ResourceCPU:    resource.MustParse(requestCPU),
-		v1.ResourceMemory: resource.MustParse(requestMem),
-	},
-}
+var (
+	ns string
+	rr = &v1.ResourceRequirements{
+		Requests: v1.ResourceList{
+			v1.ResourceCPU:    resource.MustParse(requestCPU),
+			v1.ResourceMemory: resource.MustParse(requestMem),
+		},
+	}
+)
 
 var _ = ginkgo.Describe("PriorityScheduling", func() {
-	var ns string
 	var namespace *v1.Namespace
 	var err error
 	var oldConfigMap = new(v1.ConfigMap)
@@ -251,6 +254,158 @@ var _ = ginkgo.Describe("PriorityScheduling", func() {
 		validatePodSchedulingOrder(ns, sleepPodConf, lowPodConf, normalPodConf, highPodConf)
 	})
 
+	ginkgo.It("Verify_Gang_Scheduling_With_Priority", func() {
+		By("Setting custom YuniKorn configuration")
+		annotation = "ann-" + common.RandSeq(10)
+		yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fifo", annotation, func(sc *configs.SchedulerConfig) error {
+			// remove placement rules so we can control queue
+			sc.Partitions[0].PlacementRules = nil
+
+			if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
+				Name:      "default",
+				Parent:    false,
+				Resources: configs.Resources{Max: map[string]string{siCommon.CPU: "100m", siCommon.Memory: "100M"}},
+			}); err != nil {
+				return err
+			}
+
+			return nil
+		})
+		sleepPodConf = k8s.TestPodConfig{
+			Name: "test-sleep-" + common.RandSeq(5),
+			Labels: map[string]string{
+				constants.LabelQueueName:     "root.default",
+				constants.LabelApplicationID: "app-sleep-" + common.RandSeq(5)},
+			Namespace: ns,
+			Resources: rr,
+		}
+
+		taskGroupMinResource := map[string]resource.Quantity{}
+		for k, v := range rr.Requests {
+			taskGroupMinResource[k.String()] = v
+		}
+		lowPodConf = createPodConfWithTaskGroup("low", lowPriorityClass.Name, taskGroupMinResource)
+		normalPodConf = createPodConfWithTaskGroup("normal", normalPriorityClass.Name, taskGroupMinResource)
+		highPodConf = createPodConfWithTaskGroup("high", highPriorityClass.Name, taskGroupMinResource)
+
+		var sleepPod, lowPod, normalPod, highPod *v1.Pod
+		By("Create sleep pod to consume queue")
+		sleepPod, err = k8s.InitTestPod(sleepPodConf)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		sleepPod, err = kubeClient.CreatePod(sleepPod, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		err = kubeClient.WaitForPodRunning(ns, sleepPod.Name, 1*time.Minute)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Submit low priority job")
+		lowPod, err = k8s.InitTestPod(lowPodConf)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		lowJob := k8s.InitTestJob(lowPod.Labels[constants.LabelApplicationID], 1, 1, lowPod)
+		lowJob, err = kubeClient.CreateJob(lowJob, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		err = kubeClient.WaitForJobPodsCreated(ns, lowJob.Name, 1, 30*time.Second)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Submit normal priority job")
+		normalPod, err = k8s.InitTestPod(normalPodConf)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		normalJob := k8s.InitTestJob(normalPod.Labels[constants.LabelApplicationID], 1, 1, normalPod)
+		normalJob, err = kubeClient.CreateJob(normalJob, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		err = kubeClient.WaitForJobPodsCreated(ns, normalJob.Name, 1, 30*time.Second)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Submit high priority job")
+		highPod, err = k8s.InitTestPod(highPodConf)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		highJob := k8s.InitTestJob(highPod.Labels[constants.LabelApplicationID], 1, 1, highPod)
+		highJob, err = kubeClient.CreateJob(highJob, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+		err = kubeClient.WaitForJobPodsCreated(ns, highJob.Name, 1, 30*time.Second)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Wait for scheduler state to settle")
+		time.Sleep(10 * time.Second)
+
+		var lowPods, normalPods, highPods *v1.PodList
+		lowPods, err = kubeClient.ListPods(ns, fmt.Sprintf("job-name=%s", lowJob.Name))
+		Ω(err).NotTo(gomega.HaveOccurred())
+		lowPod = &lowPods.Items[0]
+
+		normalPods, err = kubeClient.ListPods(ns, fmt.Sprintf("job-name=%s", normalJob.Name))
+		Ω(err).NotTo(gomega.HaveOccurred())
+		normalPod = &normalPods.Items[0]
+
+		highPods, err = kubeClient.ListPods(ns, fmt.Sprintf("job-name=%s", highJob.Name))
+		Ω(err).NotTo(gomega.HaveOccurred())
+		highPod = &highPods.Items[0]
+
+		By("Ensure no test pods are running")
+		ensureNotRunning(ns, lowPod, normalPod, highPod)
+
+		By("Kill sleep pod to make room for test pods")
+		err = kubeClient.DeletePod(sleepPod.Name, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Wait for high-priority placeholders terminated")
+		var tgPlaceHolders map[string][]string
+		tgPlaceHolders = yunikorn.GetPlaceholderNames(highPodConf.Annotations, highPodConf.Labels["applicationId"])
+		for _, phNames := range tgPlaceHolders {
+			for _, ph := range phNames {
+				phTermErr := kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+				Ω(phTermErr).NotTo(HaveOccurred())
+			}
+		}
+
+		By("Wait for high-priority pod to begin running")
+		err = kubeClient.WaitForPodRunning(ns, highPod.Name, 1*time.Minute)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Ensure low and normal priority pods are not running")
+		ensureNotRunning(ns, lowPod, normalPod)
+
+		By("Kill high-priority job")
+		err = kubeClient.DeleteJob(highJob.Name, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Wait for normal-priority placeholders terminated")
+		tgPlaceHolders = yunikorn.GetPlaceholderNames(normalPodConf.Annotations, normalPodConf.Labels["applicationId"])
+		for _, phNames := range tgPlaceHolders {
+			for _, ph := range phNames {
+				phTermErr := kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+				Ω(phTermErr).NotTo(HaveOccurred())
+			}
+		}
+
+		By("Wait for normal-priority pod to begin running")
+		err = kubeClient.WaitForPodRunning(ns, normalPod.Name, 1*time.Minute)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Ensure low priority pod is not running")
+		ensureNotRunning(ns, lowPod)
+
+		By("Kill normal-priority job")
+		err = kubeClient.DeleteJob(normalJob.Name, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Wait for low-priority placeholders terminated")
+		tgPlaceHolders = yunikorn.GetPlaceholderNames(lowPodConf.Annotations, lowPodConf.Labels["applicationId"])
+		for _, phNames := range tgPlaceHolders {
+			for _, ph := range phNames {
+				phTermErr := kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+				Ω(phTermErr).NotTo(HaveOccurred())
+			}
+		}
+
+		By("Wait for low-priority pod to begin running")
+		err = kubeClient.WaitForPodRunning(ns, lowPod.Name, 1*time.Minute)
+		Ω(err).NotTo(gomega.HaveOccurred())
+
+		By("Kill low-priority job")
+		err = kubeClient.DeleteJob(lowJob.Name, ns)
+		Ω(err).NotTo(gomega.HaveOccurred())
+	})
+
 	ginkgo.AfterEach(func() {
 		testDescription := ginkgo.CurrentSpecReport()
 		if testDescription.Failed() {
@@ -355,3 +510,26 @@ func ensureNotRunning(ns string, pods ...*v1.Pod) {
 		Ω(podResult.Status.Phase).ShouldNot(Equal(v1.PodRunning), pod.Name)
 	}
 }
+
+func createPodConfWithTaskGroup(name, priorityClassName string, taskGroupMinResource map[string]resource.Quantity) k8s.TestPodConfig {
+	return k8s.TestPodConfig{
+		Name: fmt.Sprintf("test-%s-priority-%s", name, common.RandSeq(5)),
+		Labels: map[string]string{
+			constants.LabelQueueName:     "root.default",
+			constants.LabelApplicationID: fmt.Sprintf("app-%s-%s", name, common.RandSeq(5))},
+		Namespace: ns,
+		Annotations: &k8s.PodAnnotation{
+			TaskGroupName: "group-" + name,
+			TaskGroups: []interfaces.TaskGroup{
+				{
+					Name:        "group-" + name,
+					MinMember:   int32(1),
+					MinResource: taskGroupMinResource,
+				},
+			},
+		},
+		Resources:         rr,
+		PriorityClassName: priorityClassName,
+		RestartPolicy:     v1.RestartPolicyNever,
+	}
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org