You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by wi...@apache.org on 2023/08/07 05:03:29 UTC
[yunikorn-core] branch master updated: [YUNIKORN-1864] Apply wild card group limit settings (#599)
This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 4983e914 [YUNIKORN-1864] Apply wild card group limit settings (#599)
4983e914 is described below
commit 4983e914a5a432b753b54919210027a3749c813a
Author: Manikandan R <ma...@gmail.com>
AuthorDate: Mon Aug 7 14:59:46 2023 +1000
[YUNIKORN-1864] Apply wild card group limit settings (#599)
For users without any matching group make sure the wildcard is checked
when a group is resolved.
Closes: #599
Signed-off-by: Wilfred Spiegelenburg <wi...@apache.org>
---
pkg/scheduler/ugm/group_tracker.go | 5 +++--
pkg/scheduler/ugm/manager.go | 42 ++++++++++++++++++++++++++------------
pkg/scheduler/ugm/manager_test.go | 39 ++++++++++++++++++++++++++++++++++-
pkg/scheduler/ugm/queue_tracker.go | 25 ++++++++++++-----------
pkg/scheduler/ugm/user_tracker.go | 10 ++++-----
5 files changed, 88 insertions(+), 33 deletions(-)
diff --git a/pkg/scheduler/ugm/group_tracker.go b/pkg/scheduler/ugm/group_tracker.go
index a48e9b22..ff410dc2 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -21,6 +21,7 @@ package ugm
import (
"sync"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -93,7 +94,7 @@ func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() *dao.GroupResourceUsageDA
for app := range gt.applications {
groupResourceUsage.Applications = append(groupResourceUsage.Applications, app)
}
- groupResourceUsage.Queues = gt.queueTracker.getResourceUsageDAOInfo("")
+ groupResourceUsage.Queues = gt.queueTracker.getResourceUsageDAOInfo(common.Empty)
return groupResourceUsage
}
@@ -130,7 +131,7 @@ func (gt *GroupTracker) removeApp(applicationID string) {
func (gt *GroupTracker) getName() string {
if gt == nil {
- return ""
+ return common.Empty
}
return gt.groupName
}
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 0f865ed9..d41b8220 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -78,7 +78,7 @@ func (m *Manager) IncreaseTrackedResource(queuePath, applicationID string, usage
zap.String("queue path", queuePath),
zap.String("application", applicationID),
zap.Stringer("resource", usage))
- if queuePath == "" || applicationID == "" || usage == nil || user.User == "" {
+ if queuePath == common.Empty || applicationID == common.Empty || usage == nil || user.User == common.Empty {
log.Log(log.SchedUGM).Debug("Mandatory parameters are missing to increase the resource usage",
zap.String("user", user.User),
zap.String("queue path", queuePath),
@@ -110,7 +110,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath, applicationID string, usage
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
- if queuePath == "" || applicationID == "" || usage == nil || user.User == "" {
+ if queuePath == common.Empty || applicationID == common.Empty || usage == nil || user.User == common.Empty {
log.Log(log.SchedUGM).Debug("Mandatory parameters are missing to decrease the resource usage",
zap.String("user", user.User),
zap.String("queue path", queuePath),
@@ -236,7 +236,19 @@ func (m *Manager) ensureGroupTrackerForApp(queuePath string, applicationID strin
if !userTracker.hasGroupForApp(applicationID) {
var groupTracker *GroupTracker
group := m.internalEnsureGroup(user, queuePath)
- if group != "" {
+
+ // Use wild card group (if configured) for users doesn't have any matching group
+ if group == common.Empty {
+ parentQueuePath := queuePath
+ for parentQueuePath != common.Empty {
+ parentQueuePath, _ = getParentQueuePath(parentQueuePath)
+ if _, ok := m.groupWildCardLimitsConfig[parentQueuePath]; ok {
+ group = common.Wildcard
+ break
+ }
+ }
+ }
+ if group != common.Empty {
if m.groupTrackers[group] == nil {
log.Log(log.SchedUGM).Debug("Group tracker doesn't exists. Creating group tracker",
zap.String("application", applicationID),
@@ -257,6 +269,10 @@ func (m *Manager) ensureGroupTrackerForApp(queuePath string, applicationID strin
userTracker.setGroupForApp(applicationID, groupTracker)
return group
} else {
+ // In case of any group changes (for example, earlier parent queue or wild card group used for the app
+ // group mapping. Now, new config has group limits in leaf queue itself) for specific user because of recent config change,
+ // there won't be effect to existing app group mapping. Already mapped group only would be used for the whole application lifecycle.
+ // Config change would be used only for new applications.
return userTracker.getGroupForApp(applicationID)
}
}
@@ -288,12 +304,12 @@ func (m *Manager) internalEnsureGroup(user security.UserGroup, queuePath string)
}
}
parentQueuePath, _ := getParentQueuePath(queuePath)
- if parentQueuePath != "" {
+ if parentQueuePath != common.Empty {
qt := userTracker.queueTracker.getChildQueueTracker(parentQueuePath)
return m.internalEnsureGroup(user, qt.queuePath)
}
}
- return ""
+ return common.Empty
}
func (m *Manager) isUserRemovable(ut *UserTracker) bool {
@@ -337,7 +353,7 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
}
limitConfig := &LimitConfig{maxResources: maxResource, maxApplications: limit.MaxApplications}
for _, user := range limit.Users {
- if user == "" {
+ if user == common.Empty {
continue
}
log.Log(log.SchedUGM).Debug("Processing user limits configuration",
@@ -355,7 +371,7 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
}
}
for _, group := range limit.Groups {
- if group == "" {
+ if group == common.Empty {
continue
}
log.Log(log.SchedUGM).Debug("Processing group limits configuration",
@@ -364,14 +380,14 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
zap.String("queue path", queuePath),
zap.Uint64("max application", limit.MaxApplications),
zap.Any("max resources", limit.MaxResources))
- if group == common.Wildcard {
- m.groupWildCardLimitsConfig[queuePath] = limitConfig
- continue
- }
if err := m.processGroupConfig(group, limitConfig, queuePath, groupLimits); err != nil {
return err
}
- m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
+ if group == common.Wildcard {
+ m.groupWildCardLimitsConfig[queuePath] = limitConfig
+ } else {
+ m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
+ }
}
}
if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath); err != nil {
@@ -606,7 +622,7 @@ func (m *Manager) Headroom(queuePath string, user security.UserGroup) *resources
zap.String("user headroom", userHeadroom.String()))
}
group := m.internalEnsureGroup(user, queuePath)
- if group != "" {
+ if group != common.Empty {
if m.groupTrackers[group] != nil {
groupHeadroom = m.groupTrackers[group].headroom(queuePath)
log.Log(log.SchedUGM).Debug("Calculated headroom for group",
diff --git a/pkg/scheduler/ugm/manager_test.go b/pkg/scheduler/ugm/manager_test.go
index 40482361..c56a0730 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -393,7 +393,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
// configure max resource for user2 * root.parent (map[memory:70 vcores:70]) higher than wild card user * root.parent settings (map[memory:10 vcores:10])
// ensure user's specific settings has been used for enforcement checks as specific limits always has higher precedence when compared to wild card user limit settings
- conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, "", "*", "*", "10", "10")
+ conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, "", "*", "", "10", "10")
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
// can be allowed to run upto resource usage map[memory:70 vcores:70]
@@ -409,6 +409,43 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
if increased {
t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
}
+
+ user3 := security.UserGroup{User: "user3", Groups: []string{"group3"}}
+ conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, user1.Groups[0], "", "*", "10", "10")
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ // user3 should be able to run app as group3 uses wild card group limit settings map[memory:10 vcores:10]
+ increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user3)
+ if !increased {
+ t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user)
+ }
+
+ // user4 (though belongs to different group, group4) should not be able to run app as group4 also
+ // uses wild card group limit settings map[memory:10 vcores:10]
+ user4 := security.UserGroup{User: "user4", Groups: []string{"group4"}}
+ increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
+ assert.Equal(t, increased, false)
+
+ conf = createUpdateConfigWithWildCardUsersAndGroups(user4.User, user4.Groups[0], "", "*", "10", "10")
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ // Since app is TestApp1, gt of "*" would be used as it is already mapped. group4 won't be used
+ increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
+ if increased {
+ t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
+ }
+
+ // Now group4 would be used as user4 is running TestApp2 for the first time. So can be allowed to run upto resource usage map[memory:70 vcores:70]
+ for i := 1; i <= 7; i++ {
+ increased = manager.IncreaseTrackedResource(queuePath1, TestApp2, usage, user4)
+ if !increased {
+ t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
+ }
+ }
+
+ // user4 should not be able to run app as user4 max limit is map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
+ increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user4)
+ assert.Equal(t, increased, false)
}
func TestUpdateConfigClearEarlierSetLimits(t *testing.T) {
diff --git a/pkg/scheduler/ugm/queue_tracker.go b/pkg/scheduler/ugm/queue_tracker.go
index acc94ba1..c793ffc0 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -21,6 +21,7 @@ package ugm
import (
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
@@ -38,13 +39,13 @@ type QueueTracker struct {
}
func newRootQueueTracker() *QueueTracker {
- qt := newQueueTracker("", configs.RootQueue)
+ qt := newQueueTracker(common.Empty, configs.RootQueue)
return qt
}
func newQueueTracker(queuePath string, queueName string) *QueueTracker {
qp := queueName
- if queuePath != "" {
+ if queuePath != common.Empty {
qp = queuePath + "." + queueName
}
queueTracker := &QueueTracker{
@@ -136,7 +137,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID
}
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
- if childQueuePath != "" {
+ if childQueuePath != common.Empty {
if qt.childQueueTrackers[immediateChildQueueName] == nil {
qt.childQueueTrackers[immediateChildQueueName] = newQueueTracker(qt.queuePath, immediateChildQueueName)
}
@@ -170,7 +171,7 @@ func (qt *QueueTracker) decreaseTrackedResource(queuePath string, applicationID
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
- if childQueuePath != "" {
+ if childQueuePath != common.Empty {
if qt.childQueueTrackers[immediateChildQueueName] == nil {
log.Log(log.SchedUGM).Error("Child queueTracker tracker must be available in child queues map",
zap.String("child queueTracker name", immediateChildQueueName))
@@ -217,8 +218,8 @@ func (qt *QueueTracker) getChildQueueTracker(queuePath string) *QueueTracker {
var childQueuePath, immediateChildQueueName string
childQueuePath, immediateChildQueueName = getChildQueuePath(queuePath)
childQueueTracker := qt
- if childQueuePath != "" {
- for childQueuePath != "" {
+ if childQueuePath != common.Empty {
+ for childQueuePath != common.Empty {
if childQueueTracker != nil {
if len(childQueueTracker.childQueueTrackers) == 0 || childQueueTracker.childQueueTrackers[immediateChildQueueName] == nil {
newChildQt := newQueueTracker(qt.queuePath, immediateChildQueueName)
@@ -248,7 +249,7 @@ func (qt *QueueTracker) headroom(queuePath string) *resources.Resource {
log.Log(log.SchedUGM).Debug("Calculating headroom",
zap.String("queue path", queuePath))
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
- if childQueuePath != "" {
+ if childQueuePath != common.Empty {
if qt.childQueueTrackers[immediateChildQueueName] != nil {
headroom := qt.childQueueTrackers[immediateChildQueueName].headroom(childQueuePath)
if headroom != nil {
@@ -279,7 +280,7 @@ func (qt *QueueTracker) getResourceUsageDAOInfo(parentQueuePath string) *dao.Res
return &dao.ResourceUsageDAOInfo{}
}
fullQueuePath := parentQueuePath + "." + qt.queueName
- if parentQueuePath == "" {
+ if parentQueuePath == common.Empty {
fullQueuePath = qt.queueName
}
usage := &dao.ResourceUsageDAOInfo{
@@ -305,7 +306,7 @@ func (qt *QueueTracker) IsQueuePathTrackedCompletely(queuePath string) bool {
return true
}
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
- if immediateChildQueueName != "" {
+ if immediateChildQueueName != common.Empty {
if childUt, ok := qt.childQueueTrackers[immediateChildQueueName]; ok {
return childUt.IsQueuePathTrackedCompletely(childQueuePath)
}
@@ -319,7 +320,7 @@ func (qt *QueueTracker) IsQueuePathTrackedCompletely(queuePath string) bool {
// the leaf and its parent queue using UnlinkQT method. Otherwise, we should leave as it is.
func (qt *QueueTracker) IsUnlinkRequired(queuePath string) bool {
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
- if immediateChildQueueName != "" {
+ if immediateChildQueueName != common.Empty {
if childUt, ok := qt.childQueueTrackers[immediateChildQueueName]; ok {
return childUt.IsUnlinkRequired(childQueuePath)
}
@@ -344,13 +345,13 @@ func (qt *QueueTracker) UnlinkQT(queuePath string) bool {
zap.Int("no. of child queue trackers", len(qt.childQueueTrackers)))
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
- if childQueuePath == "" && len(qt.childQueueTrackers) > 0 {
+ if childQueuePath == common.Empty && len(qt.childQueueTrackers) > 0 {
for qName := range qt.childQueueTrackers {
qt.UnlinkQT(qt.queueName + configs.DOT + qName)
}
}
- if childQueuePath != "" {
+ if childQueuePath != common.Empty {
if qt.childQueueTrackers[immediateChildQueueName] != nil {
unlink := qt.childQueueTrackers[immediateChildQueueName].UnlinkQT(childQueuePath)
if unlink {
diff --git a/pkg/scheduler/ugm/user_tracker.go b/pkg/scheduler/ugm/user_tracker.go
index 17353424..0dcb4b95 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -23,6 +23,7 @@ import (
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
@@ -66,11 +67,10 @@ func (ut *UserTracker) increaseTrackedResource(queuePath, applicationID string,
if !increasedGroupUsage {
_, decreased := ut.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, false)
if !decreased {
- log.Log(log.SchedUGM).Error("Problem in increasing group resource usage. Hence tried to rollback user tracked usage, but failed to do.",
+ log.Log(log.SchedUGM).Error("User resource usage rollback has failed",
zap.String("queue path", queuePath),
zap.String("application", applicationID),
- zap.String("user", ut.userName),
- zap.Stringer("usage", usage))
+ zap.String("user", ut.userName))
}
}
return increasedGroupUsage
@@ -106,7 +106,7 @@ func (ut *UserTracker) getGroupForApp(applicationID string) string {
if ut.appGroupTrackers[applicationID] != nil {
return ut.appGroupTrackers[applicationID].groupName
}
- return ""
+ return common.Empty
}
func (ut *UserTracker) getTrackedApplications() map[string]*GroupTracker {
@@ -139,7 +139,7 @@ func (ut *UserTracker) GetUserResourceUsageDAOInfo() *dao.UserResourceUsageDAOIn
userResourceUsage.Groups[app] = gt.groupName
}
}
- userResourceUsage.Queues = ut.queueTracker.getResourceUsageDAOInfo("")
+ userResourceUsage.Queues = ut.queueTracker.getResourceUsageDAOInfo(common.Empty)
return userResourceUsage
}
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org