You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by wi...@apache.org on 2023/08/07 05:03:29 UTC

[yunikorn-core] branch master updated: [YUNIKORN-1864] Apply wild card group limit settings (#599)

This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 4983e914 [YUNIKORN-1864] Apply wild card group limit settings (#599)
4983e914 is described below

commit 4983e914a5a432b753b54919210027a3749c813a
Author: Manikandan R <ma...@gmail.com>
AuthorDate: Mon Aug 7 14:59:46 2023 +1000

    [YUNIKORN-1864] Apply wild card group limit settings (#599)
    
    For users without any matching group  make sure the wildcard is checked
    when a group is resolved.
    
    Closes: #599
    
    Signed-off-by: Wilfred Spiegelenburg <wi...@apache.org>
---
 pkg/scheduler/ugm/group_tracker.go |  5 +++--
 pkg/scheduler/ugm/manager.go       | 42 ++++++++++++++++++++++++++------------
 pkg/scheduler/ugm/manager_test.go  | 39 ++++++++++++++++++++++++++++++++++-
 pkg/scheduler/ugm/queue_tracker.go | 25 ++++++++++++-----------
 pkg/scheduler/ugm/user_tracker.go  | 10 ++++-----
 5 files changed, 88 insertions(+), 33 deletions(-)

diff --git a/pkg/scheduler/ugm/group_tracker.go b/pkg/scheduler/ugm/group_tracker.go
index a48e9b22..ff410dc2 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -21,6 +21,7 @@ package ugm
 import (
 	"sync"
 
+	"github.com/apache/yunikorn-core/pkg/common"
 	"github.com/apache/yunikorn-core/pkg/common/resources"
 	"github.com/apache/yunikorn-core/pkg/webservice/dao"
 )
@@ -93,7 +94,7 @@ func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() *dao.GroupResourceUsageDA
 	for app := range gt.applications {
 		groupResourceUsage.Applications = append(groupResourceUsage.Applications, app)
 	}
-	groupResourceUsage.Queues = gt.queueTracker.getResourceUsageDAOInfo("")
+	groupResourceUsage.Queues = gt.queueTracker.getResourceUsageDAOInfo(common.Empty)
 	return groupResourceUsage
 }
 
@@ -130,7 +131,7 @@ func (gt *GroupTracker) removeApp(applicationID string) {
 
 func (gt *GroupTracker) getName() string {
 	if gt == nil {
-		return ""
+		return common.Empty
 	}
 	return gt.groupName
 }
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 0f865ed9..d41b8220 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -78,7 +78,7 @@ func (m *Manager) IncreaseTrackedResource(queuePath, applicationID string, usage
 		zap.String("queue path", queuePath),
 		zap.String("application", applicationID),
 		zap.Stringer("resource", usage))
-	if queuePath == "" || applicationID == "" || usage == nil || user.User == "" {
+	if queuePath == common.Empty || applicationID == common.Empty || usage == nil || user.User == common.Empty {
 		log.Log(log.SchedUGM).Debug("Mandatory parameters are missing to increase the resource usage",
 			zap.String("user", user.User),
 			zap.String("queue path", queuePath),
@@ -110,7 +110,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath, applicationID string, usage
 		zap.String("application", applicationID),
 		zap.Stringer("resource", usage),
 		zap.Bool("removeApp", removeApp))
-	if queuePath == "" || applicationID == "" || usage == nil || user.User == "" {
+	if queuePath == common.Empty || applicationID == common.Empty || usage == nil || user.User == common.Empty {
 		log.Log(log.SchedUGM).Debug("Mandatory parameters are missing to decrease the resource usage",
 			zap.String("user", user.User),
 			zap.String("queue path", queuePath),
@@ -236,7 +236,19 @@ func (m *Manager) ensureGroupTrackerForApp(queuePath string, applicationID strin
 	if !userTracker.hasGroupForApp(applicationID) {
 		var groupTracker *GroupTracker
 		group := m.internalEnsureGroup(user, queuePath)
-		if group != "" {
+
+		// Use wild card group (if configured) for users doesn't have any matching group
+		if group == common.Empty {
+			parentQueuePath := queuePath
+			for parentQueuePath != common.Empty {
+				parentQueuePath, _ = getParentQueuePath(parentQueuePath)
+				if _, ok := m.groupWildCardLimitsConfig[parentQueuePath]; ok {
+					group = common.Wildcard
+					break
+				}
+			}
+		}
+		if group != common.Empty {
 			if m.groupTrackers[group] == nil {
 				log.Log(log.SchedUGM).Debug("Group tracker doesn't exists. Creating group tracker",
 					zap.String("application", applicationID),
@@ -257,6 +269,10 @@ func (m *Manager) ensureGroupTrackerForApp(queuePath string, applicationID strin
 		userTracker.setGroupForApp(applicationID, groupTracker)
 		return group
 	} else {
+		// In case of any group changes (for example, earlier parent queue or wild card group used for the app
+		// group mapping. Now, new config has group limits in leaf queue itself) for specific user because of recent config change,
+		// there won't be effect to existing app group mapping. Already mapped group only would be used for the whole application lifecycle.
+		// Config change would be used only for new applications.
 		return userTracker.getGroupForApp(applicationID)
 	}
 }
@@ -288,12 +304,12 @@ func (m *Manager) internalEnsureGroup(user security.UserGroup, queuePath string)
 			}
 		}
 		parentQueuePath, _ := getParentQueuePath(queuePath)
-		if parentQueuePath != "" {
+		if parentQueuePath != common.Empty {
 			qt := userTracker.queueTracker.getChildQueueTracker(parentQueuePath)
 			return m.internalEnsureGroup(user, qt.queuePath)
 		}
 	}
-	return ""
+	return common.Empty
 }
 
 func (m *Manager) isUserRemovable(ut *UserTracker) bool {
@@ -337,7 +353,7 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
 		}
 		limitConfig := &LimitConfig{maxResources: maxResource, maxApplications: limit.MaxApplications}
 		for _, user := range limit.Users {
-			if user == "" {
+			if user == common.Empty {
 				continue
 			}
 			log.Log(log.SchedUGM).Debug("Processing user limits configuration",
@@ -355,7 +371,7 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
 			}
 		}
 		for _, group := range limit.Groups {
-			if group == "" {
+			if group == common.Empty {
 				continue
 			}
 			log.Log(log.SchedUGM).Debug("Processing group limits configuration",
@@ -364,14 +380,14 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
 				zap.String("queue path", queuePath),
 				zap.Uint64("max application", limit.MaxApplications),
 				zap.Any("max resources", limit.MaxResources))
-			if group == common.Wildcard {
-				m.groupWildCardLimitsConfig[queuePath] = limitConfig
-				continue
-			}
 			if err := m.processGroupConfig(group, limitConfig, queuePath, groupLimits); err != nil {
 				return err
 			}
-			m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
+			if group == common.Wildcard {
+				m.groupWildCardLimitsConfig[queuePath] = limitConfig
+			} else {
+				m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
+			}
 		}
 	}
 	if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath); err != nil {
@@ -606,7 +622,7 @@ func (m *Manager) Headroom(queuePath string, user security.UserGroup) *resources
 			zap.String("user headroom", userHeadroom.String()))
 	}
 	group := m.internalEnsureGroup(user, queuePath)
-	if group != "" {
+	if group != common.Empty {
 		if m.groupTrackers[group] != nil {
 			groupHeadroom = m.groupTrackers[group].headroom(queuePath)
 			log.Log(log.SchedUGM).Debug("Calculated headroom for group",
diff --git a/pkg/scheduler/ugm/manager_test.go b/pkg/scheduler/ugm/manager_test.go
index 40482361..c56a0730 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -393,7 +393,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
 
 	// configure max resource for user2 * root.parent (map[memory:70 vcores:70]) higher than wild card user * root.parent settings (map[memory:10 vcores:10])
 	// ensure user's specific settings has been used for enforcement checks as specific limits always has higher precedence when compared to wild card user limit settings
-	conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, "", "*", "*", "10", "10")
+	conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, "", "*", "", "10", "10")
 	assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
 
 	// can be allowed to run upto resource usage map[memory:70 vcores:70]
@@ -409,6 +409,43 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
 	if increased {
 		t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
 	}
+
+	user3 := security.UserGroup{User: "user3", Groups: []string{"group3"}}
+	conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, user1.Groups[0], "", "*", "10", "10")
+	assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+	// user3 should be able to run app as group3 uses wild card group limit settings map[memory:10 vcores:10]
+	increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user3)
+	if !increased {
+		t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user)
+	}
+
+	// user4 (though belongs to different group, group4) should not be able to run app as group4 also
+	// uses wild card group limit settings map[memory:10 vcores:10]
+	user4 := security.UserGroup{User: "user4", Groups: []string{"group4"}}
+	increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
+	assert.Equal(t, increased, false)
+
+	conf = createUpdateConfigWithWildCardUsersAndGroups(user4.User, user4.Groups[0], "", "*", "10", "10")
+	assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+	// Since app is TestApp1, gt of "*" would be used as it is already mapped. group4 won't be used
+	increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
+	if increased {
+		t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
+	}
+
+	// Now group4 would be used as user4 is running TestApp2 for the first time. So can be allowed to run upto resource usage map[memory:70 vcores:70]
+	for i := 1; i <= 7; i++ {
+		increased = manager.IncreaseTrackedResource(queuePath1, TestApp2, usage, user4)
+		if !increased {
+			t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
+		}
+	}
+
+	// user4 should not be able to run app as user4 max limit is map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
+	increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
+	assert.Equal(t, increased, false)
 }
 
 func TestUpdateConfigClearEarlierSetLimits(t *testing.T) {
diff --git a/pkg/scheduler/ugm/queue_tracker.go b/pkg/scheduler/ugm/queue_tracker.go
index acc94ba1..c793ffc0 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -21,6 +21,7 @@ package ugm
 import (
 	"go.uber.org/zap"
 
+	"github.com/apache/yunikorn-core/pkg/common"
 	"github.com/apache/yunikorn-core/pkg/common/configs"
 	"github.com/apache/yunikorn-core/pkg/common/resources"
 	"github.com/apache/yunikorn-core/pkg/log"
@@ -38,13 +39,13 @@ type QueueTracker struct {
 }
 
 func newRootQueueTracker() *QueueTracker {
-	qt := newQueueTracker("", configs.RootQueue)
+	qt := newQueueTracker(common.Empty, configs.RootQueue)
 	return qt
 }
 
 func newQueueTracker(queuePath string, queueName string) *QueueTracker {
 	qp := queueName
-	if queuePath != "" {
+	if queuePath != common.Empty {
 		qp = queuePath + "." + queueName
 	}
 	queueTracker := &QueueTracker{
@@ -136,7 +137,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID
 	}
 
 	childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
-	if childQueuePath != "" {
+	if childQueuePath != common.Empty {
 		if qt.childQueueTrackers[immediateChildQueueName] == nil {
 			qt.childQueueTrackers[immediateChildQueueName] = newQueueTracker(qt.queuePath, immediateChildQueueName)
 		}
@@ -170,7 +171,7 @@ func (qt *QueueTracker) decreaseTrackedResource(queuePath string, applicationID
 		zap.Stringer("resource", usage),
 		zap.Bool("removeApp", removeApp))
 	childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
-	if childQueuePath != "" {
+	if childQueuePath != common.Empty {
 		if qt.childQueueTrackers[immediateChildQueueName] == nil {
 			log.Log(log.SchedUGM).Error("Child queueTracker tracker must be available in child queues map",
 				zap.String("child queueTracker name", immediateChildQueueName))
@@ -217,8 +218,8 @@ func (qt *QueueTracker) getChildQueueTracker(queuePath string) *QueueTracker {
 	var childQueuePath, immediateChildQueueName string
 	childQueuePath, immediateChildQueueName = getChildQueuePath(queuePath)
 	childQueueTracker := qt
-	if childQueuePath != "" {
-		for childQueuePath != "" {
+	if childQueuePath != common.Empty {
+		for childQueuePath != common.Empty {
 			if childQueueTracker != nil {
 				if len(childQueueTracker.childQueueTrackers) == 0 || childQueueTracker.childQueueTrackers[immediateChildQueueName] == nil {
 					newChildQt := newQueueTracker(qt.queuePath, immediateChildQueueName)
@@ -248,7 +249,7 @@ func (qt *QueueTracker) headroom(queuePath string) *resources.Resource {
 	log.Log(log.SchedUGM).Debug("Calculating headroom",
 		zap.String("queue path", queuePath))
 	childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
-	if childQueuePath != "" {
+	if childQueuePath != common.Empty {
 		if qt.childQueueTrackers[immediateChildQueueName] != nil {
 			headroom := qt.childQueueTrackers[immediateChildQueueName].headroom(childQueuePath)
 			if headroom != nil {
@@ -279,7 +280,7 @@ func (qt *QueueTracker) getResourceUsageDAOInfo(parentQueuePath string) *dao.Res
 		return &dao.ResourceUsageDAOInfo{}
 	}
 	fullQueuePath := parentQueuePath + "." + qt.queueName
-	if parentQueuePath == "" {
+	if parentQueuePath == common.Empty {
 		fullQueuePath = qt.queueName
 	}
 	usage := &dao.ResourceUsageDAOInfo{
@@ -305,7 +306,7 @@ func (qt *QueueTracker) IsQueuePathTrackedCompletely(queuePath string) bool {
 		return true
 	}
 	childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
-	if immediateChildQueueName != "" {
+	if immediateChildQueueName != common.Empty {
 		if childUt, ok := qt.childQueueTrackers[immediateChildQueueName]; ok {
 			return childUt.IsQueuePathTrackedCompletely(childQueuePath)
 		}
@@ -319,7 +320,7 @@ func (qt *QueueTracker) IsQueuePathTrackedCompletely(queuePath string) bool {
 // the leaf and its parent queue using UnlinkQT method. Otherwise, we should leave as it is.
 func (qt *QueueTracker) IsUnlinkRequired(queuePath string) bool {
 	childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
-	if immediateChildQueueName != "" {
+	if immediateChildQueueName != common.Empty {
 		if childUt, ok := qt.childQueueTrackers[immediateChildQueueName]; ok {
 			return childUt.IsUnlinkRequired(childQueuePath)
 		}
@@ -344,13 +345,13 @@ func (qt *QueueTracker) UnlinkQT(queuePath string) bool {
 		zap.Int("no. of child queue trackers", len(qt.childQueueTrackers)))
 	childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
 
-	if childQueuePath == "" && len(qt.childQueueTrackers) > 0 {
+	if childQueuePath == common.Empty && len(qt.childQueueTrackers) > 0 {
 		for qName := range qt.childQueueTrackers {
 			qt.UnlinkQT(qt.queueName + configs.DOT + qName)
 		}
 	}
 
-	if childQueuePath != "" {
+	if childQueuePath != common.Empty {
 		if qt.childQueueTrackers[immediateChildQueueName] != nil {
 			unlink := qt.childQueueTrackers[immediateChildQueueName].UnlinkQT(childQueuePath)
 			if unlink {
diff --git a/pkg/scheduler/ugm/user_tracker.go b/pkg/scheduler/ugm/user_tracker.go
index 17353424..0dcb4b95 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -23,6 +23,7 @@ import (
 
 	"go.uber.org/zap"
 
+	"github.com/apache/yunikorn-core/pkg/common"
 	"github.com/apache/yunikorn-core/pkg/common/resources"
 	"github.com/apache/yunikorn-core/pkg/log"
 	"github.com/apache/yunikorn-core/pkg/webservice/dao"
@@ -66,11 +67,10 @@ func (ut *UserTracker) increaseTrackedResource(queuePath, applicationID string,
 		if !increasedGroupUsage {
 			_, decreased := ut.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, false)
 			if !decreased {
-				log.Log(log.SchedUGM).Error("Problem in increasing group resource usage. Hence tried to rollback user tracked usage, but failed to do.",
+				log.Log(log.SchedUGM).Error("User resource usage rollback has failed",
 					zap.String("queue path", queuePath),
 					zap.String("application", applicationID),
-					zap.String("user", ut.userName),
-					zap.Stringer("usage", usage))
+					zap.String("user", ut.userName))
 			}
 		}
 		return increasedGroupUsage
@@ -106,7 +106,7 @@ func (ut *UserTracker) getGroupForApp(applicationID string) string {
 	if ut.appGroupTrackers[applicationID] != nil {
 		return ut.appGroupTrackers[applicationID].groupName
 	}
-	return ""
+	return common.Empty
 }
 
 func (ut *UserTracker) getTrackedApplications() map[string]*GroupTracker {
@@ -139,7 +139,7 @@ func (ut *UserTracker) GetUserResourceUsageDAOInfo() *dao.UserResourceUsageDAOIn
 			userResourceUsage.Groups[app] = gt.groupName
 		}
 	}
-	userResourceUsage.Queues = ut.queueTracker.getResourceUsageDAOInfo("")
+	userResourceUsage.Queues = ut.queueTracker.getResourceUsageDAOInfo(common.Empty)
 	return userResourceUsage
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org