You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by ma...@apache.org on 2024/01/18 06:00:05 UTC
(yunikorn-core) branch master updated: [YUNIKORN-2209] Remove limit checks in QueueTracker (#758)
This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 1f5f1c91 [YUNIKORN-2209] Remove limit checks in QueueTracker (#758)
1f5f1c91 is described below
commit 1f5f1c912db06dc3141fe086b022f82fcc7f1be8
Author: Manikandan R <ma...@gmail.com>
AuthorDate: Thu Jan 18 11:29:48 2024 +0530
[YUNIKORN-2209] Remove limit checks in QueueTracker (#758)
Closes: #758
Signed-off-by: Manikandan R <ma...@gmail.com>
---
pkg/scheduler/ugm/group_tracker.go | 8 +-
pkg/scheduler/ugm/group_tracker_test.go | 4 +-
pkg/scheduler/ugm/manager.go | 91 +++++++++----
pkg/scheduler/ugm/manager_test.go | 223 +++++++++++++++++++++++---------
pkg/scheduler/ugm/queue_tracker.go | 135 +++++--------------
pkg/scheduler/ugm/queue_tracker_test.go | 30 ++---
pkg/scheduler/ugm/user_tracker.go | 10 +-
pkg/scheduler/ugm/user_tracker_test.go | 8 +-
8 files changed, 290 insertions(+), 219 deletions(-)
diff --git a/pkg/scheduler/ugm/group_tracker.go b/pkg/scheduler/ugm/group_tracker.go
index 6987d5cf..85cdd30e 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -34,10 +34,10 @@ type GroupTracker struct {
sync.RWMutex
}
-func newGroupTracker(group string) *GroupTracker {
- queueTracker := newRootQueueTracker()
+func newGroupTracker(groupName string) *GroupTracker {
+ queueTracker := newRootQueueTracker(group)
groupTracker := &GroupTracker{
- groupName: group,
+ groupName: groupName,
applications: make(map[string]string),
queueTracker: queueTracker,
}
@@ -75,7 +75,7 @@ func (gt *GroupTracker) getTrackedApplications() map[string]string {
func (gt *GroupTracker) setLimits(hierarchy []string, resource *resources.Resource, maxApps uint64) {
gt.Lock()
defer gt.Unlock()
- gt.queueTracker.setLimit(hierarchy, resource, maxApps)
+ gt.queueTracker.setLimit(hierarchy, resource, maxApps, false, group, false)
}
func (gt *GroupTracker) headroom(hierarchy []string) *resources.Resource {
diff --git a/pkg/scheduler/ugm/group_tracker_test.go b/pkg/scheduler/ugm/group_tracker_test.go
index c969113a..a9998ab2 100644
--- a/pkg/scheduler/ugm/group_tracker_test.go
+++ b/pkg/scheduler/ugm/group_tracker_test.go
@@ -32,7 +32,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
// root->parent->child1->child12
// root->parent->child2
// root->parent->child12 (similar name like above leaf queue, but it is being treated differently as similar names are allowed)
- GetUserManager()
+ manager := GetUserManager()
user := &security.UserGroup{User: "test", Groups: []string{"test"}}
groupTracker := newGroupTracker(user.User)
@@ -40,6 +40,8 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
}
+
+ manager.Headroom(queuePath1, TestApp1, *user)
result := groupTracker.increaseTrackedResource(hierarchy1, TestApp1, usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %+q, app %s, res %v", hierarchy1, TestApp1, usage1)
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 47f0fb1b..a4cff214 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -281,20 +281,6 @@ func (m *Manager) ensureGroupInternal(userGroups []string, queuePath string) str
return m.ensureGroupInternal(userGroups, parentPath)
}
-func (m *Manager) isUserRemovable(ut *UserTracker) bool {
- if len(ut.getTrackedApplications()) == 0 && resources.IsZero(ut.queueTracker.resourceUsage) {
- return true
- }
- return false
-}
-
-func (m *Manager) isGroupRemovable(gt *GroupTracker) bool {
- if len(gt.getTrackedApplications()) == 0 && resources.IsZero(gt.queueTracker.resourceUsage) {
- return true
- }
- return false
-}
-
func (m *Manager) UpdateConfig(config configs.QueueConfig, queuePath string) error {
userWildCardLimitsConfig := make(map[string]*LimitConfig)
groupWildCardLimitsConfig := make(map[string]*LimitConfig)
@@ -311,6 +297,12 @@ func (m *Manager) UpdateConfig(config configs.QueueConfig, queuePath string) err
// compare existing config with new configs stored in above temporary maps
m.clearEarlierSetLimits(userLimits, groupLimits)
+ // compare existing wild card user config with new configs stored in above temporary maps
+ m.clearEarlierSetUserWildCardLimits(userWildCardLimitsConfig, userLimits)
+
+ // apply wild card user limits to all existing users for which no limits configured explicitly
+ m.applyWildCardUserLimits(userWildCardLimitsConfig, userLimits)
+
// switch over - replace the existing config with new configs
m.replaceLimitConfigs(userLimits, groupLimits, userWildCardLimitsConfig, groupWildCardLimitsConfig, configuredGroups)
@@ -401,6 +393,62 @@ func (m *Manager) clearEarlierSetLimits(newUserLimits map[string]map[string]*Lim
m.clearEarlierSetUserLimits(newUserLimits)
}
+// clearEarlierSetUserWildCardLimits Traverse new wild card user config and decide whether earlier usage needs to be cleared/updated or not
+// by comparing with the existing config. If config set earlier but not now, then traverse all users, check whether wild card limit has been applied/used or not.
+// Reset earlier settings for the users only when wild card user limit has been applied.
+// If config set earlier and now as well, then traverse all users and apply the current wild card user limit configs
+// only when wild card user limit has been applied earlier.
+func (m *Manager) clearEarlierSetUserWildCardLimits(newUserWildCardLimits map[string]*LimitConfig, newUserLimits map[string]map[string]*LimitConfig) {
+ m.RLock()
+ defer m.RUnlock()
+ for queuePath, currentLimitConfig := range m.userWildCardLimitsConfig {
+ hierarchy := strings.Split(queuePath, configs.DOT)
+ _, currentQPExists := m.userLimits[queuePath]
+ _, newQPExists := newUserLimits[queuePath]
+
+ // Does queue path exist? In case wild limit does not exist, reset limit settings and useWildCard flag for all those users
+ if newLimitConfig, ok := newUserWildCardLimits[queuePath]; !ok && (!currentQPExists || !newQPExists) {
+ for _, ut := range m.userTrackers {
+ _, exists := m.userLimits[queuePath][ut.userName]
+ if _, ok = newUserLimits[queuePath][ut.userName]; !ok || !exists {
+ log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for user because wild card limit has been applied earlier",
+ zap.String("user", ut.userName),
+ zap.String("queue path", queuePath))
+ ut.setLimits(hierarchy, nil, 0, false, true)
+ }
+ }
+ } else if !currentQPExists || !newQPExists {
+ // In case wild card user limit exists, compare the old wild card limits with new limits for existing users already using wild card limits.
+ // In case of difference, set new limits for all those users.
+ if currentLimitConfig.maxApplications != newLimitConfig.maxApplications ||
+ !resources.Equals(currentLimitConfig.maxResources, newLimitConfig.maxResources) {
+ for _, ut := range m.userTrackers {
+ log.Log(log.SchedUGM).Debug("Need to update earlier set configs for user because wild card limit applied earlier has been updated",
+ zap.String("user", ut.userName),
+ zap.String("queue path", queuePath))
+ _, exists := m.userLimits[queuePath][ut.userName]
+ if _, ok = newUserLimits[queuePath][ut.userName]; !ok || !exists {
+ ut.setLimits(hierarchy, newLimitConfig.maxResources, newLimitConfig.maxApplications, true, true)
+ }
+ }
+ }
+ }
+ }
+}
+
+// applyWildCardUserLimits Traverse new wild card user config and apply the limits for all existing users for which no limits configured explicitly
+func (m *Manager) applyWildCardUserLimits(newUserWildCardLimits map[string]*LimitConfig, newUserLimits map[string]map[string]*LimitConfig) {
+ m.RLock()
+ defer m.RUnlock()
+ for queuePath, newLimitConfig := range newUserWildCardLimits {
+ for _, ut := range m.userTrackers {
+ if _, ok := newUserLimits[queuePath][ut.userName]; !ok {
+ ut.setLimits(strings.Split(queuePath, "."), newLimitConfig.maxResources, newLimitConfig.maxApplications, true, false)
+ }
+ }
+ }
+}
+
// clearEarlierSetUserLimits Traverse new user config and decide whether earlier usage needs to be cleared or not
// by comparing with the existing config. Reset earlier usage only config set earlier but not now
func (m *Manager) clearEarlierSetUserLimits(newUserLimits map[string]map[string]*LimitConfig) {
@@ -435,7 +483,7 @@ func (m *Manager) resetUserEarlierUsage(ut *UserTracker, hierarchy []string) {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for user",
zap.String("user", ut.userName),
zap.Strings("queue path", hierarchy))
- ut.setLimits(hierarchy, nil, 0)
+ ut.setLimits(hierarchy, nil, 0, false, false)
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if ut.IsUnlinkRequired(hierarchy) {
ut.UnlinkQT(hierarchy)
@@ -529,7 +577,7 @@ func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig, hierarchy
userTracker = newUserTracker(user)
m.userTrackers[user] = userTracker
}
- userTracker.setLimits(hierarchy, limitConfig.maxResources, limitConfig.maxApplications)
+ userTracker.setLimits(hierarchy, limitConfig.maxResources, limitConfig.maxApplications, false, false)
return nil
}
@@ -570,23 +618,12 @@ func (m *Manager) getUserTracker(user string) *UserTracker {
}
func (m *Manager) getUserWildCardLimitsConfig(queuePath string) *LimitConfig {
- m.RLock()
- defer m.RUnlock()
if config, ok := m.userWildCardLimitsConfig[queuePath]; ok {
return config
}
return nil
}
-func (m *Manager) getGroupWildCardLimitsConfig(queuePath string) *LimitConfig {
- m.RLock()
- defer m.RUnlock()
- if config, ok := m.groupWildCardLimitsConfig[queuePath]; ok {
- return config
- }
- return nil
-}
-
// Headroom calculates the headroom for this specific application that runs as the user and group.
func (m *Manager) Headroom(queuePath, applicationID string, user security.UserGroup) *resources.Resource {
hierarchy := strings.Split(queuePath, configs.DOT)
diff --git a/pkg/scheduler/ugm/manager_test.go b/pkg/scheduler/ugm/manager_test.go
index 23a3c6e8..8749d117 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -205,11 +205,8 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
t.Errorf("unable to increase tracked resource. queuepath: %s, application id: %s, resource usage: %s, user: %s", queuePath1, TestApp1, usage1.String(), user.User)
}
- userTrackers := manager.GetUsersResources()
- userTracker := userTrackers[0]
groupTrackers := manager.GetGroupsResources()
assert.Equal(t, len(groupTrackers), 0)
- assert.Equal(t, false, manager.isUserRemovable(userTracker))
assertUGM(t, user, usage1, 1)
assert.Equal(t, user.User, manager.GetUserTracker(user.User).userName)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) == nil, true)
@@ -307,28 +304,19 @@ func TestUpdateConfig(t *testing.T) {
}
}
- userTrackers := manager.GetUsersResources()
- userTracker := userTrackers[0]
- groupTrackers := manager.GetGroupsResources()
- groupTracker := groupTrackers[0]
- assert.Equal(t, false, manager.isUserRemovable(userTracker))
- assert.Equal(t, false, manager.isGroupRemovable(groupTracker))
-
// configure max resource for root.parent lesser than current resource usage. should be allowed to set but user cannot be allowed to do any activity further
conf = createConfig(user.User, user.Groups[0], "memory", "50", 40, 4)
err = manager.UpdateConfig(conf.Queues[0], "root")
assert.NilError(t, err)
- increased := manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user)
- if increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user)
- }
+ headroom := manager.Headroom(queuePath1, TestApp1, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
// configure max resource for root and parent to allow one more application to run
conf = createConfig(user.User, user.Groups[0], "memory", "50", 60, 6)
err = manager.UpdateConfig(conf.Queues[0], "root")
assert.NilError(t, err, "unable to set the limit for user user1 because current resource usage is greater than config max resource for root.parent")
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user)
+ increased := manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user)
if !increased {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user)
}
@@ -337,10 +325,93 @@ func TestUpdateConfig(t *testing.T) {
conf = createConfig(user.User, user.Groups[0], "memory", "50", 10, 10)
err = manager.UpdateConfig(conf.Queues[0], "root")
assert.NilError(t, err)
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user)
- if increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user)
+ headroom = manager.Headroom(queuePath1, TestApp1, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
+}
+
+func TestUseWildCard(t *testing.T) {
+ setupUGM()
+ manager := GetUserManager()
+ user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+
+ expectedResource, err := resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "50"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, expectedResource)
+ }
+ usage, err := resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage)
+ }
+
+ expectedHeadroom, err := resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "50"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, expectedHeadroom)
}
+
+ user1 := security.UserGroup{User: "user2", Groups: []string{"group2"}}
+ conf := createUpdateConfigWithWildCardUsersAndGroups(user1.User, user1.Groups[0], "*", "*", "50", "50")
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ // user1 fallback on wild card user limit. user1 max resources and max applications would be overwritten with wild card user limit settings
+ headroom := manager.Headroom(queuePath1, TestApp1, user)
+ assert.Equal(t, resources.Equals(headroom, expectedHeadroom), true)
+
+ // user2 has its own settings, so doesn't fallback on wild card user limit.
+ headroom = manager.Headroom(queuePath1, TestApp1, user1)
+ assert.Equal(t, resources.Equals(headroom, resources.Multiply(usage, 7)), true)
+
+ // user1 uses wild card user limit settings.
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.maxRunningApps, uint64(0))
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].maxRunningApps, uint64(10))
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].maxRunningApps, uint64(0))
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user.User).queueTracker.maxResources, nil), true)
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].maxResources, expectedHeadroom), true)
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].maxResources, nil), true)
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.useWildCard, false)
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].useWildCard, true)
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].useWildCard, false)
+
+ // user2 uses its own settings.
+ assert.Equal(t, manager.GetUserTracker(user1.User).queueTracker.maxRunningApps, uint64(20))
+ assert.Equal(t, manager.GetUserTracker(user1.User).queueTracker.childQueueTrackers["parent"].maxRunningApps, uint64(10))
+ assert.Equal(t, manager.GetUserTracker(user1.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].maxRunningApps, uint64(0))
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user1.User).queueTracker.maxResources, resources.Multiply(usage, 14)), true)
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user1.User).queueTracker.childQueueTrackers["parent"].maxResources, resources.Multiply(usage, 7)), true)
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user1.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].maxResources, nil), true)
+
+ for i := 0; i < 5; i++ {
+ // should run as user has already fallen back on wild card user limit set on "root.parent" map[memory:50 vcores:50]
+ increased := manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user)
+ assert.Equal(t, increased, true)
+ }
+
+ // should not run as user has exceeded wild card user limit set on "root.parent" map[memory:50 vcores:50]
+ headroom = manager.Headroom(queuePath1, TestApp3, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
+
+ // clear all configs. Since wild card user limit is not there, all users used its settings earlier under the same queue path should start using its own value
+ conf = createConfigWithoutLimits()
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"), err)
+
+ headroom = manager.Headroom(queuePath1, TestApp1, user)
+ assert.Equal(t, resources.Equals(headroom, nil), true)
+
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.maxRunningApps, uint64(0))
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].maxRunningApps, uint64(0))
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].maxRunningApps, uint64(0))
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user.User).queueTracker.maxResources, nil), true)
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].maxResources, nil), true)
+ assert.Equal(t, resources.Equals(manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].maxResources, nil), true)
+
+ // set limit for user1 explicitly. New limit should precede the wild card user limit
+ conf = createUpdateConfigWithWildCardUsersAndGroups(user.User, user.Groups[0], "", "", "50", "50")
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ headroom = manager.Headroom(queuePath1, TestApp1, user)
+ assert.Equal(t, resources.Equals(headroom, resources.Multiply(usage, 2)), true)
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.useWildCard, false)
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].useWildCard, false)
+ assert.Equal(t, manager.GetUserTracker(user.User).queueTracker.childQueueTrackers["parent"].childQueueTrackers["child1"].useWildCard, false)
}
func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
@@ -379,8 +450,8 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
assert.Equal(t, increased, true)
// should not run as user 'user' setting is map[memory:60 vcores:60] and total usage of "root.parent" is map[memory:60 vcores:60]
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp3, usage, user)
- assert.Equal(t, increased, false)
+ headroom := manager.Headroom(queuePath1, TestApp3, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
// configure max resource for root.parent to allow one more application to run through wild card user settings (not through specific user)
// configure limits for user2 only. However, user1 should not be cleared as it has running applications
@@ -391,6 +462,9 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
+ headroom = manager.Headroom(queuePath1, TestApp2, user)
+ assert.Equal(t, resources.Equals(headroom, usage), true)
+
// user1 still should be able to run app as wild card user '*' setting is map[memory:70 vcores:70] for "root.parent" and
// total usage of "root.parent" is map[memory:60 vcores:60]
increased = manager.IncreaseTrackedResource(queuePath1, TestApp2, usage, user)
@@ -398,8 +472,8 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
// user1 should not be able to run app as wild card user '*' setting is map[memory:70 vcores:70] for "root.parent"
// and total usage of "root.parent" is map[memory:70 vcores:70]
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp3, usage, user)
- assert.Equal(t, increased, false)
+ headroom = manager.Headroom(queuePath1, TestApp3, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
// configure max resource for group1 * root.parent (map[memory:70 vcores:70]) higher than wild card group * root.parent settings (map[memory:10 vcores:10])
// ensure group's specific settings has been used for enforcement checks as specific limits always has higher precedence when compared to wild card group limit settings
@@ -422,8 +496,8 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
}
// user2 should not be able to run app as user2 max limit is map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user1)
- assert.Equal(t, increased, false)
+ headroom = manager.Headroom(queuePath1, TestApp1, user1)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
user3 := security.UserGroup{User: "user3", Groups: []string{"group3"}}
conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, user1.Groups[0], "", "*", "10", "10")
@@ -436,15 +510,15 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
// user4 (though belongs to different group, group4) should not be able to run app as group4 also
// uses wild card group limit settings map[memory:10 vcores:10]
user4 := security.UserGroup{User: "user4", Groups: []string{"group4"}}
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
- assert.Equal(t, increased, false)
+ headroom = manager.Headroom(queuePath1, TestApp1, user4)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
conf = createUpdateConfigWithWildCardUsersAndGroups(user4.User, user4.Groups[0], "", "*", "10", "10")
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
// Since app is TestApp1, gt of "*" would be used as it is already mapped. group4 won't be used
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
- assert.Equal(t, increased, false)
+ headroom = manager.Headroom(queuePath1, TestApp1, user4)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
// Now group4 would be used as user4 is running TestApp2 for the first time. So can be allowed to run upto resource usage map[memory:70 vcores:70]
for i := 1; i <= 7; i++ {
@@ -453,8 +527,8 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
}
// user4 should not be able to run app as user4 max limit is map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
- assert.Equal(t, increased, false)
+ headroom = manager.Headroom(queuePath1, TestApp1, user4)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
}
func TestUpdateConfigClearEarlierSetLimits(t *testing.T) {
@@ -545,8 +619,8 @@ func TestUpdateConfigClearEarlierSetGroupLimits(t *testing.T) {
assert.Equal(t, increased, true, "unable to increase tracked resource: queuepath "+cQueue+", app "+TestApp1+", res "+usage.String())
}
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
- increased := manager.IncreaseTrackedResource(cQueue, TestApp1, usage, user)
- assert.Equal(t, increased, false, "unable to increase tracked resource: queuepath "+cQueue+", app "+TestApp1+", res "+usage.String())
+ headroom := manager.Headroom(cQueue, TestApp1, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
}
func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
@@ -586,8 +660,8 @@ func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
increased := manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage, user)
assert.Equal(t, increased, true, "unable to increase tracked resource: queuepath root.parent.leaf, app "+TestApp1+", res "+usage.String())
- increased = manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage, user)
- assert.Equal(t, increased, false, "unable to increase tracked resource: queuepath root.parent.leaf, app "+TestApp1+", res "+usage.String())
+ headroom := manager.Headroom("root.parent.leaf", TestApp1, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
@@ -595,13 +669,6 @@ func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
decreased = manager.DecreaseTrackedResource("root.parent.leaf", TestApp1, usage, user, true)
assert.Equal(t, decreased, true, "unable to decrease tracked resource: queuepath root.parent.leaf, app "+TestApp1+", res "+usage.String())
- userTrackers := manager.GetUsersResources()
- userTracker := userTrackers[0]
- groupTrackers := manager.GetGroupsResources()
- groupTracker := groupTrackers[0]
- assert.Equal(t, true, manager.isUserRemovable(userTracker))
- assert.Equal(t, true, manager.isGroupRemovable(groupTracker))
-
conf = createConfigWithoutLimits()
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
@@ -650,8 +717,8 @@ func TestUserGroupHeadroom(t *testing.T) {
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
assert.Equal(t, resources.Equals(headroom, resources.Multiply(usage, 0)), true)
- increased = manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage, user)
- assert.Equal(t, increased, false, "unable to increase tracked resource: queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String())
+ headroom = manager.Headroom("root.parent.leaf", TestApp1, user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
// configure limits only for group
conf = createUpdateConfigWithWildCardUsersAndGroups("", user.Groups[0], "*", "*", "80", "80")
@@ -749,10 +816,10 @@ func TestUserGroupLimitWithMultipleApps(t *testing.T) {
assert.Equal(t, resources.Equals(gt2.queueTracker.resourceUsage, usage), true)
// limit has reached for both the groups
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, userGroup)
- assert.Equal(t, increased, false)
- increased = manager.IncreaseTrackedResource(queuePath2, TestApp2, usage, userGroup)
- assert.Equal(t, increased, false)
+ headroom := manager.Headroom(queuePath1, TestApp1, userGroup)
+ assert.Equal(t, resources.Equals(headroom, resources.Zero), true, "init headroom is not expected")
+ headroom = manager.Headroom(queuePath2, TestApp2, userGroup)
+ assert.Equal(t, resources.Equals(headroom, resources.Zero), true, "init headroom is not expected")
// remove the apps
decreased := manager.DecreaseTrackedResource(queuePath1, TestApp1, usage, userGroup, true)
@@ -962,6 +1029,8 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
conf configs.PartitionConfig
initExpectedHeadroomResource map[string]string
finalExpectedHeadroomResource map[string]string
+ canRunApp bool
+ isHeadroomAvailable bool
}{
// unmixed user and group limit
{
@@ -972,6 +1041,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with a specific user limit",
@@ -981,6 +1051,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
{
name: "maxresources with a specific user limit and a wildcard user limit for a not specific user",
@@ -991,6 +1062,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with a specific user limit and a wildcard user limit for a not specific user",
@@ -1001,6 +1073,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
{
name: "maxresources with a specific user limit and a wildcard user limit for a specific user",
@@ -1011,6 +1084,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with a specific user limit and a wildcard user limit for a specific user",
@@ -1021,6 +1095,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
{
name: "maxresources with a wildcard user limit",
@@ -1030,6 +1105,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with a wildcard user limit",
@@ -1039,6 +1115,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
{
name: "maxresources with a specific group limit",
@@ -1048,6 +1125,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with a specific group limit",
@@ -1057,6 +1135,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
{
name: "maxresources with a specific group limit and a wildcard group limit for a not specific group user",
@@ -1067,6 +1146,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with a specific group limit and a wildcard group limit for a not specific group user",
@@ -1077,6 +1157,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
{
name: "maxresources with a specific group limit and a wildcard group limit for a specific group user",
@@ -1087,6 +1168,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with a specific group limit and a wildcard group limit for a specific group user",
@@ -1097,6 +1179,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
// mixed user and group limit
{
@@ -1108,6 +1191,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with user limit lower than group limit",
@@ -1118,6 +1202,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
{
name: "maxresources with gorup limit lower than user limit",
@@ -1128,6 +1213,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: mediumResource,
finalExpectedHeadroomResource: nil,
+ canRunApp: true,
},
{
name: "maxapplications with group limit lower than user limit",
@@ -1138,6 +1224,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
}),
initExpectedHeadroomResource: largeResource,
finalExpectedHeadroomResource: mediumResource,
+ isHeadroomAvailable: true,
},
}
@@ -1171,9 +1258,9 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
assert.NilError(t, err, fmt.Sprintf("can't create resource from %v", tc.finalExpectedHeadroomResource))
headroom = manager.Headroom(queuePathParent, TestApp1, tc.user)
assert.Equal(t, resources.Equals(headroom, finalExpectedHeadroom), true, "final headroom is not expected")
-
- increased = manager.IncreaseTrackedResource(queuePathParent, TestApp2, usage, tc.user)
- assert.Equal(t, increased, false, "should not increase tracked resource: queuepath "+queuePathParent+", app "+TestApp2+", res "+usage.String())
+ assert.Equal(t, manager.CanRunApp(queuePathParent, TestApp2, tc.user), tc.canRunApp)
+ headroom = manager.Headroom(queuePathParent, TestApp2, tc.user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), tc.isHeadroomAvailable)
})
}
}
@@ -1267,14 +1354,15 @@ func TestUserGroupMaxResourcesChange(t *testing.T) { //nolint:funlen
func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
testCases := []struct {
- name string
- user security.UserGroup
- limits []configs.Limit
- newLimits []configs.Limit
+ name string
+ user security.UserGroup
+ limits []configs.Limit
+ newLimits []configs.Limit
+ maxAppsExceeded bool
+ maxResourcesExceeded bool
}{
// user limit only
{
-
name: "maxresources with an updated specific user limit",
user: security.UserGroup{User: "user1", Groups: []string{"group1"}},
limits: []configs.Limit{
@@ -1283,6 +1371,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
newLimits: []configs.Limit{
createLimit([]string{"user1"}, nil, mediumResource, 2),
},
+ maxResourcesExceeded: true,
},
{
name: "maxapplications with an updated specific user limit",
@@ -1293,6 +1382,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
newLimits: []configs.Limit{
createLimit([]string{"user1"}, nil, largeResource, 1),
},
+ maxAppsExceeded: true,
},
// group limit only
@@ -1306,6 +1396,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
newLimits: []configs.Limit{
createLimit(nil, []string{"group1"}, mediumResource, 2),
},
+ maxResourcesExceeded: true,
},
{
name: "maxapplications with an updated specific group limit",
@@ -1316,6 +1407,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
newLimits: []configs.Limit{
createLimit(nil, []string{"group1"}, largeResource, 1),
},
+ maxAppsExceeded: true,
},
// user wilcard limit
@@ -1330,6 +1422,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
createLimit([]string{"user1"}, nil, largeResource, 2),
createLimit([]string{"*"}, nil, mediumResource, 2),
},
+ maxResourcesExceeded: true,
},
{
name: "maxapplications with an updated wildcard user limit",
@@ -1342,6 +1435,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
createLimit([]string{"user1"}, nil, largeResource, 2),
createLimit([]string{"*"}, nil, largeResource, 1),
},
+ maxAppsExceeded: true,
},
// group wilcard limit
@@ -1357,6 +1451,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
createLimit(nil, []string{"group1"}, largeResource, 2),
createLimit(nil, []string{"*"}, mediumResource, 2),
},
+ maxResourcesExceeded: true,
},
{
name: "maxapplications with an updated wildcard group limit",
@@ -1369,6 +1464,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
createLimit(nil, []string{"group1"}, largeResource, 2),
createLimit(nil, []string{"*"}, largeResource, 1),
},
+ maxAppsExceeded: true,
},
// in a different limit
@@ -1391,6 +1487,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
MaxApplications: 2,
},
},
+ maxResourcesExceeded: true,
},
{
name: "maxapplications with a new specific user limit",
@@ -1411,6 +1508,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
MaxApplications: 1,
},
},
+ maxAppsExceeded: true,
},
{
name: "maxresources with a new specific group limit",
@@ -1431,6 +1529,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
MaxApplications: 2,
},
},
+ maxResourcesExceeded: true,
},
{
name: "maxapplications with a new specific group limit",
@@ -1451,6 +1550,7 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
MaxApplications: 1,
},
},
+ maxAppsExceeded: true,
},
}
@@ -1480,8 +1580,9 @@ func TestUserGroupLimitChange(t *testing.T) { //nolint:funlen
conf.Queues[0].Queues[0].Limits = tc.newLimits
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
- increased = manager.IncreaseTrackedResource(queuePathParent, TestApp2, usage, tc.user)
- assert.Equal(t, increased, false, "should not increase tracked resource: queuepath "+queuePathParent+", app "+TestApp2+", res "+usage.String())
+ assert.Equal(t, manager.CanRunApp(queuePathParent, TestApp2, tc.user), !tc.maxAppsExceeded)
+ headroom := manager.Headroom(queuePathParent, TestApp2, tc.user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), !tc.maxResourcesExceeded)
})
}
}
@@ -1524,12 +1625,12 @@ func TestMultipleGroupLimitChange(t *testing.T) {
assert.Equal(t, increased, true, "unable to increase tracked resource: queuepath "+queuePathParent+", app test-app-1-2, res "+usage.String())
// user2 can't increase usage more than wildcard limit
- increased = manager.IncreaseTrackedResource(queuePathParent, "test-app-2-2", usage, user2)
- assert.Equal(t, increased, false, "should not increase tracked resource: queuepath "+queuePathParent+", app test-app-2-2, res "+usage.String())
+ headroom := manager.Headroom(queuePathParent, "test-app-2-2", user2)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
// user3 can't increase usage more than wildcard limit
- increased = manager.IncreaseTrackedResource(queuePathParent, "test-app-3-2", usage, user3)
- assert.Equal(t, increased, false, "should not increase tracked resource: queuepath "+queuePathParent+", app test-app-3-2, res "+usage.String())
+ headroom = manager.Headroom(queuePathParent, "test-app-3-2", user3)
+ assert.Equal(t, headroom.FitInMaxUndef(usage), false)
}
func createLimit(users, groups []string, maxResources map[string]string, maxApps uint64) configs.Limit {
diff --git a/pkg/scheduler/ugm/queue_tracker.go b/pkg/scheduler/ugm/queue_tracker.go
index 7358cc64..93956333 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -36,14 +36,15 @@ type QueueTracker struct {
maxResources *resources.Resource
maxRunningApps uint64
childQueueTrackers map[string]*QueueTracker
+ useWildCard bool
}
-func newRootQueueTracker() *QueueTracker {
- qt := newQueueTracker(common.Empty, configs.RootQueue)
+func newRootQueueTracker(trackType trackingType) *QueueTracker {
+ qt := newQueueTracker(common.Empty, configs.RootQueue, trackType)
return qt
}
-func newQueueTracker(queuePath string, queueName string) *QueueTracker {
+func newQueueTracker(queuePath string, queueName string, trackType trackingType) *QueueTracker {
qp := queueName
if queuePath != common.Empty {
qp = queuePath + "." + queueName
@@ -57,6 +58,20 @@ func newQueueTracker(queuePath string, queueName string) *QueueTracker {
maxRunningApps: 0,
childQueueTrackers: make(map[string]*QueueTracker),
}
+
+ // Override user/group specific limits with wild card limit settings
+ if trackType == user {
+ if config := m.getUserWildCardLimitsConfig(queuePath + "." + queueName); config != nil {
+ log.Log(log.SchedUGM).Debug("Use wild card limit settings as there is no limit set explicitly",
+ zap.String("queue name", queueName),
+ zap.String("queue path", queuePath),
+ zap.Uint64("max applications", config.maxApplications),
+ zap.Stringer("max resources", config.maxResources))
+ queueTracker.maxResources = config.maxResources.Clone()
+ queueTracker.maxRunningApps = config.maxApplications
+ queueTracker.useWildCard = true
+ }
+ }
log.Log(log.SchedUGM).Debug("Created queue tracker object for queue",
zap.String("queue", queueName))
return queueTracker
@@ -76,93 +91,32 @@ func (qt *QueueTracker) increaseTrackedResource(hierarchy []string, applicationI
zap.String("queue path", qt.queuePath),
zap.Strings("hierarchy", hierarchy),
zap.String("application", applicationID),
- zap.Stringer("resource", usage))
+ zap.Stringer("resource", usage),
+ zap.Bool("use wild card", qt.useWildCard))
// depth first: all the way to the leaf, create if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
- qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName)
+ qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
if !qt.childQueueTrackers[childName].increaseTrackedResource(hierarchy[1:], applicationID, trackType, usage) {
return false
}
}
-
if qt.resourceUsage == nil {
qt.resourceUsage = resources.NewResource()
}
- finalResourceUsage := qt.resourceUsage.Clone()
- finalResourceUsage.AddTo(usage)
- wildCardQuotaExceeded := false
- existingApp := qt.runningApplications[applicationID]
-
- // apply user/group specific limit settings set if configured, otherwise use wild card limit settings
- if qt.maxRunningApps != 0 && !resources.IsZero(qt.maxResources) {
- log.Log(log.SchedUGM).Debug("applying enforcement checks using limit settings of specific user/group",
- zap.Int("tracking type", int(trackType)),
- zap.String("queue path", qt.queuePath),
- zap.Bool("existing app", existingApp),
- zap.Uint64("max running apps", qt.maxRunningApps),
- zap.Stringer("max resources", qt.maxResources))
- if (!existingApp && len(qt.runningApplications)+1 > int(qt.maxRunningApps)) ||
- resources.StrictlyGreaterThan(finalResourceUsage, qt.maxResources) {
- log.Log(log.SchedUGM).Warn("Unable to increase resource usage as allowing new application to run would exceed either configured max applications or max resources limit of specific user/group",
- zap.Int("tracking type", int(trackType)),
- zap.String("queue path", qt.queuePath),
- zap.Bool("existing app", existingApp),
- zap.Int("current running applications", len(qt.runningApplications)),
- zap.Uint64("max running applications", qt.maxRunningApps),
- zap.Stringer("current resource usage", qt.resourceUsage),
- zap.Stringer("max resource usage", qt.maxResources))
- return false
- }
- }
-
- // Try wild card settings
- if qt.maxRunningApps == 0 && resources.IsZero(qt.maxResources) {
- // Is there any wild card settings? Do we need to apply enforcement checks using wild card limit settings?
- var config *LimitConfig
- if trackType == user {
- config = m.getUserWildCardLimitsConfig(qt.queuePath)
- } else if trackType == group {
- config = m.getGroupWildCardLimitsConfig(qt.queuePath)
- }
- if config != nil {
- wildCardQuotaExceeded = (config.maxApplications != 0 && !existingApp && len(qt.runningApplications)+1 > int(config.maxApplications)) ||
- (!resources.IsZero(config.maxResources) && resources.StrictlyGreaterThan(finalResourceUsage, config.maxResources))
- log.Log(log.SchedUGM).Debug("applying enforcement checks using limit settings of wild card user/group",
- zap.Int("tracking type", int(trackType)),
- zap.String("queue path", qt.queuePath),
- zap.Bool("existing app", existingApp),
- zap.Uint64("wild card max running apps", config.maxApplications),
- zap.Stringer("wild card max resources", config.maxResources),
- zap.Bool("wild card quota exceeded", wildCardQuotaExceeded))
- if wildCardQuotaExceeded {
- log.Log(log.SchedUGM).Warn("Unable to increase resource usage as allowing new application to run would exceed either configured max applications or max resources limit of wild card user/group",
- zap.Int("tracking type", int(trackType)),
- zap.String("queue path", qt.queuePath),
- zap.Bool("existing app", existingApp),
- zap.Int("current running applications", len(qt.runningApplications)),
- zap.Uint64("max running applications", config.maxApplications),
- zap.Stringer("current resource usage", qt.resourceUsage),
- zap.Stringer("max resource usage", config.maxResources))
- return false
- }
- }
- }
-
qt.resourceUsage.AddTo(usage)
qt.runningApplications[applicationID] = true
-
log.Log(log.SchedUGM).Debug("Successfully increased resource usage",
zap.Int("tracking type", int(trackType)),
zap.String("queue path", qt.queuePath),
zap.String("application", applicationID),
- zap.Bool("existing app", existingApp),
zap.Stringer("resource", usage),
zap.Uint64("max running applications", qt.maxRunningApps),
zap.Stringer("max resource usage", qt.maxResources),
+ zap.Bool("use wild card", qt.useWildCard),
zap.Stringer("total resource after increasing", qt.resourceUsage),
zap.Int("total applications after increasing", len(qt.runningApplications)))
return true
@@ -220,23 +174,29 @@ func (qt *QueueTracker) decreaseTrackedResource(hierarchy []string, applicationI
return removeQT, true
}
-func (qt *QueueTracker) setLimit(hierarchy []string, maxResource *resources.Resource, maxApps uint64) {
+func (qt *QueueTracker) setLimit(hierarchy []string, maxResource *resources.Resource, maxApps uint64, useWildCard bool, trackType trackingType, doWildCardCheck bool) {
log.Log(log.SchedUGM).Debug("Setting limits",
zap.String("queue path", qt.queuePath),
zap.Strings("hierarchy", hierarchy),
zap.Uint64("max applications", maxApps),
- zap.Stringer("max resources", maxResource))
+ zap.Stringer("max resources", maxResource),
+ zap.Bool("use wild card", useWildCard))
// depth first: all the way to the leaf, create if not exists
// more than 1 in the slice means we need to recurse down
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
- qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName)
+ qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
- qt.childQueueTrackers[childName].setLimit(hierarchy[1:], maxResource, maxApps)
+ qt.childQueueTrackers[childName].setLimit(hierarchy[1:], maxResource, maxApps, useWildCard, trackType, false)
} else if len(hierarchy) == 1 {
+ // don't override named user/group specific limits with wild card limits
+ if doWildCardCheck && !qt.useWildCard {
+ return
+ }
qt.maxRunningApps = maxApps
qt.maxResources = maxResource
+ qt.useWildCard = useWildCard
}
}
@@ -247,7 +207,7 @@ func (qt *QueueTracker) headroom(hierarchy []string, trackType trackingType) *re
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
- qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName)
+ qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
childHeadroom = qt.childQueueTrackers[childName].headroom(hierarchy[1:], trackType)
}
@@ -256,20 +216,8 @@ func (qt *QueueTracker) headroom(hierarchy []string, trackType trackingType) *re
if !resources.IsZero(qt.maxResources) {
headroom = qt.maxResources.Clone()
headroom.SubOnlyExisting(qt.resourceUsage)
- } else if resources.IsZero(childHeadroom) {
- // If childHeadroom is not nil, it means there is an user or wildcard limit config in child queue,
- // so we don't check wildcard limit config in current queue.
-
- // Fall back on wild card user limit settings to calculate headroom only for unnamed users.
- // For unnamed groups, "*" group tracker object would be used using the above block to calculate headroom
- // because resource usage added together for all unnamed groups under "*" group tracker object.
- if trackType == user {
- if config := m.getUserWildCardLimitsConfig(qt.queuePath); config != nil {
- headroom = config.maxResources.Clone()
- headroom.SubOnlyExisting(qt.resourceUsage)
- }
- }
}
+
if headroom == nil {
return childHeadroom
}
@@ -418,7 +366,7 @@ func (qt *QueueTracker) canRunApp(hierarchy []string, applicationID string, trac
if len(hierarchy) > 1 {
childName := hierarchy[1]
if qt.childQueueTrackers[childName] == nil {
- qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName)
+ qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName, trackType)
}
childCanRunApp = qt.childQueueTrackers[childName].canRunApp(hierarchy[1:], applicationID, trackType)
}
@@ -439,19 +387,6 @@ func (qt *QueueTracker) canRunApp(hierarchy []string, applicationID string, trac
if qt.maxRunningApps != 0 && running > int(qt.maxRunningApps) {
return false
}
-
- // Try wild card settings
- if qt.maxRunningApps == 0 {
- var config *LimitConfig
- if trackType == user {
- config = m.getUserWildCardLimitsConfig(qt.queuePath)
- } else if trackType == group {
- config = m.getGroupWildCardLimitsConfig(qt.queuePath)
- }
- if config != nil && config.maxApplications != 0 && running > int(config.maxApplications) {
- return false
- }
- }
return true
}
diff --git a/pkg/scheduler/ugm/queue_tracker_test.go b/pkg/scheduler/ugm/queue_tracker_test.go
index 9537b3ba..d897cddc 100644
--- a/pkg/scheduler/ugm/queue_tracker_test.go
+++ b/pkg/scheduler/ugm/queue_tracker_test.go
@@ -34,7 +34,7 @@ func TestQTIncreaseTrackedResource(t *testing.T) {
// root->parent->child2
// root->parent->child12 (similar name like above leaf queue, but it is being treated differently as similar names are allowed)
GetUserManager()
- queueTracker := newQueueTracker("", "root")
+ queueTracker := newQueueTracker("", "root", user)
usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "10M", "vcore": "10"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
@@ -87,7 +87,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) {
// root->parent->child1
// root->parent->child2
GetUserManager()
- queueTracker := newQueueTracker("", "root")
+ queueTracker := newQueueTracker("", "root", user)
usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "70M", "vcore": "70"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1)
@@ -191,7 +191,7 @@ func TestQTQuotaEnforcement(t *testing.T) {
// root->parent->child2. max apps - 2 , max res - 20M, 20cores
// root->parent->child12 (similar name like above leaf queue, but it is being treated differently as similar names are allowed). config not set
GetUserManager()
- queueTracker := newQueueTracker("", "root")
+ queueTracker := newQueueTracker("", "root", user)
usage1, err := resources.NewResourceFromConf(map[string]string{"mem": "10M", "vcore": "10"})
if err != nil {
@@ -201,17 +201,17 @@ func TestQTQuotaEnforcement(t *testing.T) {
queueTracker.maxResources = resources.Multiply(usage1, 6)
queueTracker.maxRunningApps = 6
- parentQueueTracker := newQueueTracker("root", "parent")
+ parentQueueTracker := newQueueTracker("root", "parent", user)
parentQueueTracker.maxResources = resources.Multiply(usage1, 5)
parentQueueTracker.maxRunningApps = 5
queueTracker.childQueueTrackers["parent"] = parentQueueTracker
- child1QueueTracker := newQueueTracker("root.parent", "child1")
+ child1QueueTracker := newQueueTracker("root.parent", "child1", user)
child1QueueTracker.maxResources = resources.Multiply(usage1, 2)
child1QueueTracker.maxRunningApps = 2
parentQueueTracker.childQueueTrackers["child1"] = child1QueueTracker
- child2QueueTracker := newQueueTracker("root.parent.child2", "child2")
+ child2QueueTracker := newQueueTracker("root.parent.child2", "child2", user)
child2QueueTracker.maxResources = resources.Multiply(usage1, 2)
child2QueueTracker.maxRunningApps = 2
parentQueueTracker.childQueueTrackers["child2"] = child2QueueTracker
@@ -231,10 +231,8 @@ func TestQTQuotaEnforcement(t *testing.T) {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath2, TestApp2, usage1)
}
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp3, user, usage1)
- if result {
- t.Fatalf("Increasing resource usage should fail as child2's resource usage exceeded configured max resources limit. queuepath %s, app %s, res %v", queuePath2, TestApp3, usage1)
- }
+ headroom := queueTracker.headroom(strings.Split(queuePath2, configs.DOT), user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage1), false)
result = queueTracker.increaseTrackedResource(strings.Split(queuePath3, configs.DOT), TestApp3, user, usage1)
if !result {
@@ -246,10 +244,8 @@ func TestQTQuotaEnforcement(t *testing.T) {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath4, TestApp4, usage1)
}
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath4, configs.DOT), TestApp4, user, usage1)
- if result {
- t.Fatalf("Increasing resource usage should fail as parent's resource usage exceeded configured max resources limit. queuepath %s, app %s, res %v", queuePath4, TestApp4, usage1)
- }
+ headroom = queueTracker.headroom(strings.Split(queuePath4, configs.DOT), user)
+ assert.Equal(t, headroom.FitInMaxUndef(usage1), false)
}
func TestHeadroom(t *testing.T) {
@@ -258,11 +254,11 @@ func TestHeadroom(t *testing.T) {
hierarchy := strings.Split(path, configs.DOT)
// nothing exists make sure the hierarchy gets created
- root := newRootQueueTracker()
- root.childQueueTrackers["parent"] = newQueueTracker("root", "parent")
+ root := newRootQueueTracker(user)
+ root.childQueueTrackers["parent"] = newQueueTracker("root", "parent", user)
parent := root.childQueueTrackers["parent"]
assert.Assert(t, parent != nil, "parent queue tracker should have been created")
- parent.childQueueTrackers["leaf"] = newQueueTracker("root.parent", "leaf")
+ parent.childQueueTrackers["leaf"] = newQueueTracker("root.parent", "leaf", user)
leaf := parent.childQueueTrackers["leaf"]
assert.Assert(t, leaf != nil, "leaf queue tracker should have been created")
diff --git a/pkg/scheduler/ugm/user_tracker.go b/pkg/scheduler/ugm/user_tracker.go
index f2361cd1..97a4a60e 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -42,10 +42,10 @@ type UserTracker struct {
sync.RWMutex
}
-func newUserTracker(user string) *UserTracker {
- queueTracker := newRootQueueTracker()
+func newUserTracker(userName string) *UserTracker {
+ queueTracker := newRootQueueTracker(user)
userTracker := &UserTracker{
- userName: user,
+ userName: userName,
appGroupTrackers: make(map[string]*GroupTracker),
queueTracker: queueTracker,
}
@@ -115,10 +115,10 @@ func (ut *UserTracker) getTrackedApplications() map[string]*GroupTracker {
return ut.appGroupTrackers
}
-func (ut *UserTracker) setLimits(hierarchy []string, resource *resources.Resource, maxApps uint64) {
+func (ut *UserTracker) setLimits(hierarchy []string, resource *resources.Resource, maxApps uint64, useWildCard bool, doWildCardCheck bool) {
ut.Lock()
defer ut.Unlock()
- ut.queueTracker.setLimit(hierarchy, resource, maxApps)
+ ut.queueTracker.setLimit(hierarchy, resource, maxApps, useWildCard, user, doWildCardCheck)
}
func (ut *UserTracker) headroom(hierarchy []string) *resources.Resource {
diff --git a/pkg/scheduler/ugm/user_tracker_test.go b/pkg/scheduler/ugm/user_tracker_test.go
index ed18570b..258fb15d 100644
--- a/pkg/scheduler/ugm/user_tracker_test.go
+++ b/pkg/scheduler/ugm/user_tracker_test.go
@@ -206,8 +206,8 @@ func TestSetMaxLimits(t *testing.T) {
t.Fatalf("unable to increase tracked resource: queuepath %+q, app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
}
- userTracker.setLimits(hierarchy1, resources.Multiply(usage1, 5), 5)
- userTracker.setLimits(hierarchy5, resources.Multiply(usage1, 10), 10)
+ userTracker.setLimits(hierarchy1, resources.Multiply(usage1, 5), 5, false, false)
+ userTracker.setLimits(hierarchy5, resources.Multiply(usage1, 10), 10, false, false)
result = userTracker.increaseTrackedResource(hierarchy1, TestApp1, usage1)
if !result {
@@ -218,8 +218,8 @@ func TestSetMaxLimits(t *testing.T) {
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %+q, app %s, res %v", hierarchy1, TestApp2, usage1)
}
- userTracker.setLimits(hierarchy1, usage1, 1)
- userTracker.setLimits(hierarchy5, usage1, 1)
+ userTracker.setLimits(hierarchy1, usage1, 1, false, false)
+ userTracker.setLimits(hierarchy5, usage1, 1, false, false)
}
func getUserResource(ut *UserTracker) map[string]*resources.Resource {
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org