You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by "wilfred-s (via GitHub)" <gi...@apache.org> on 2023/06/21 05:56:53 UTC

[GitHub] [yunikorn-core] wilfred-s commented on a diff in pull request #563: [YUNIKORN-1610] Enforcement changes for User Based Quota

wilfred-s commented on code in PR #563:
URL: https://github.com/apache/yunikorn-core/pull/563#discussion_r1236305672


##########
pkg/scheduler/ugm/manager.go:
##########
@@ -35,23 +35,24 @@ var m *Manager
 
 const maxresources = "maxresources"
 const maxapplications = "maxapplications"
+const wildcard = "*"
 
 // Manager implements tracker. A User Group Manager to track the usage for both user and groups.
 // Holds object of both user and group trackers
 type Manager struct {
-	userTrackers      map[string]*UserTracker
-	groupTrackers     map[string]*GroupTracker
-	userLimitsConfig  map[string]map[string]map[string]interface{} // Hold limits settings of user * queue path
-	groupLimitsConfig map[string]map[string]map[string]interface{} // Hold limits settings of group * queue path
+	userTrackers              map[string]*UserTracker
+	groupTrackers             map[string]*GroupTracker
+	userWildCardLimitsConfig  map[string]map[string]interface{} // Hold limits settings of user '*'
+	groupWildCardLimitsConfig map[string]map[string]interface{} // Hold limits settings of group '*'

Review Comment:
   Simplify this: introduce a struct type that has two entries. Resource and uint64 to store the wildcard settings in. Store that struct in the map. That safes one indirection and the continuous conversion of the limit object's `map[string]string` to a resource.



##########
pkg/scheduler/ugm/queue_tracker.go:
##########
@@ -31,38 +31,141 @@ import (
 
 type QueueTracker struct {
 	queueName           string
+	parent              *QueueTracker
+	queuePath           string
 	resourceUsage       *resources.Resource
 	runningApplications map[string]bool
-	maxResourceUsage    *resources.Resource
+	maxResources        *resources.Resource
 	maxRunningApps      uint64
 	childQueueTrackers  map[string]*QueueTracker
 }
 
 func newRootQueueTracker() *QueueTracker {
-	return newQueueTracker(configs.RootQueue)
+	qt := newQueueTracker(configs.RootQueue, nil)
+	return qt
 }
 
-func newQueueTracker(queueName string) *QueueTracker {
-	log.Logger().Debug("Creating queue tracker object for queue",
-		zap.String("queue", queueName))
+func newQueueTracker(queueName string, parent *QueueTracker) *QueueTracker {
 	queueTracker := &QueueTracker{
 		queueName:           queueName,
+		parent:              parent,
 		resourceUsage:       resources.NewResource(),
 		runningApplications: make(map[string]bool),
+		maxResources:        resources.NewResource(),
+		maxRunningApps:      0,
 		childQueueTrackers:  make(map[string]*QueueTracker),
 	}
+	parentQueueName := ""
+	if parent != nil {
+		queueTracker.queuePath = queueTracker.parent.queuePath + configs.DOT + queueName
+		parentQueueName = parent.queueName
+	} else if queueName == configs.RootQueue {
+		queueTracker.queuePath = configs.RootQueue
+	}
+	log.Logger().Debug("Created queue tracker object for queue",
+		zap.String("queue", queueName),
+		zap.String("parent", parentQueueName),
+		zap.String("queue path ", queueTracker.queuePath))
 	return queueTracker
 }
 
-func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource) error {
+type trackingType int
+
+const (
+	none trackingType = iota
+	user
+	group
+)
+
+func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, trackType trackingType, usage *resources.Resource) bool {
 	log.Logger().Debug("Increasing resource usage",
+		zap.Int("tracking type", int(trackType)),
 		zap.String("queue path", queuePath),
 		zap.String("application", applicationID),
 		zap.Stringer("resource", usage))
 	if queuePath == "" || applicationID == "" || usage == nil {
-		return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s",
-			queuePath, applicationID, usage.String())
+		return false
+	}
+
+	finalResourceUsage := qt.resourceUsage.Clone()
+	finalResourceUsage.AddTo(usage)
+	wildCardQuotaExceeded := false
+
+	// Are there any settings for specific user/group? If not, try wild card settings
+	if int(qt.maxRunningApps) == 0 && resources.Equals(resources.NewResource(), qt.maxResources) {
+		// Is there any wild card settings? Do we need to apply enforcement checks using wild card limit settings?
+		var config map[string]interface{}
+		if trackType == user {
+			config = m.getUserWildCardLimitsConfig(qt.queuePath)
+		} else if trackType == group {
+			config = m.getGroupWildCardLimitsConfig(qt.queuePath)
+		}
+		if config != nil {
+			var maxResources map[string]string
+			var maxApplications uint64
+			var ok bool
+			if maxApplications, ok = config[maxapplications].(uint64); !ok {
+				log.Logger().Debug("Problem in using the wild card limit max application settings.",
+					zap.String("queue path", queuePath),
+					zap.Int("tracking type", int(trackType)),
+					zap.Any("max applications ", config[maxapplications]))
+			}
+			if maxResources, ok = config[maxresources].(map[string]string); !ok {
+				log.Logger().Debug("Problem in using the wild card limit max resources settings.",
+					zap.String("queue path", queuePath),
+					zap.Int("tracking type", int(trackType)),
+					zap.Any("max applications ", config[maxresources]))
+			}
+			var maxResource *resources.Resource
+			var err error
+			if maxResource, err = resources.NewResourceFromConf(maxResources); err != nil {
+				log.Logger().Debug("Problem in using the wild card limit max resources settings.",
+					zap.String("queue path", queuePath),
+					zap.String("max resources after casting", maxResource.String()),
+					zap.Error(err))
+				return false
+			}
+			wildCardQuotaExceeded = (maxApplications != 0 && len(qt.runningApplications)+1 > int(maxApplications)) || (!resources.Equals(resources.NewResource(), maxResource) && resources.StrictlyGreaterThan(finalResourceUsage, maxResource))
+			log.Logger().Debug("using wild card limit settings as user/group specific limit settings not set",
+				zap.Int("tracking type", int(trackType)),
+				zap.String("queue path", queuePath),
+				zap.Uint64("wild card max running apps", maxApplications),
+				zap.String("wild card max resources", maxResource.String()),
+				zap.Bool("wild card quota exceeded", wildCardQuotaExceeded))
+		}

Review Comment:
   return here if not passed: wildcard has been checked and no specific quota was set.



##########
pkg/scheduler/ugm/queue_tracker.go:
##########
@@ -48,20 +49,36 @@ func newQueueTracker(queueName string) *QueueTracker {
 		queueName:           queueName,
 		resourceUsage:       resources.NewResource(),
 		runningApplications: make(map[string]bool),
+		maxResourceUsage:    resources.NewResource(),
+		maxRunningApps:      0,
 		childQueueTrackers:  make(map[string]*QueueTracker),
 	}
 	return queueTracker
 }
 
-func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource) error {
+func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource) bool {
 	log.Logger().Debug("Increasing resource usage",
 		zap.String("queue path", queuePath),
 		zap.String("application", applicationID),
 		zap.Stringer("resource", usage))
 	if queuePath == "" || applicationID == "" || usage == nil {
-		return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s",
-			queuePath, applicationID, usage.String())
+		return false
+	}

Review Comment:
   agree remove the check



##########
pkg/scheduler/ugm/queue_tracker.go:
##########
@@ -31,38 +31,141 @@ import (
 
 type QueueTracker struct {
 	queueName           string
+	parent              *QueueTracker
+	queuePath           string
 	resourceUsage       *resources.Resource
 	runningApplications map[string]bool
-	maxResourceUsage    *resources.Resource
+	maxResources        *resources.Resource
 	maxRunningApps      uint64
 	childQueueTrackers  map[string]*QueueTracker
 }
 
 func newRootQueueTracker() *QueueTracker {
-	return newQueueTracker(configs.RootQueue)
+	qt := newQueueTracker(configs.RootQueue, nil)
+	return qt
 }
 
-func newQueueTracker(queueName string) *QueueTracker {
-	log.Logger().Debug("Creating queue tracker object for queue",
-		zap.String("queue", queueName))
+func newQueueTracker(queueName string, parent *QueueTracker) *QueueTracker {
 	queueTracker := &QueueTracker{
 		queueName:           queueName,
+		parent:              parent,
 		resourceUsage:       resources.NewResource(),
 		runningApplications: make(map[string]bool),
+		maxResources:        resources.NewResource(),
+		maxRunningApps:      0,
 		childQueueTrackers:  make(map[string]*QueueTracker),
 	}
+	parentQueueName := ""
+	if parent != nil {
+		queueTracker.queuePath = queueTracker.parent.queuePath + configs.DOT + queueName
+		parentQueueName = parent.queueName
+	} else if queueName == configs.RootQueue {
+		queueTracker.queuePath = configs.RootQueue
+	}
+	log.Logger().Debug("Created queue tracker object for queue",
+		zap.String("queue", queueName),
+		zap.String("parent", parentQueueName),
+		zap.String("queue path ", queueTracker.queuePath))
 	return queueTracker
 }
 
-func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource) error {
+type trackingType int
+
+const (
+	none trackingType = iota
+	user
+	group
+)
+
+func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, trackType trackingType, usage *resources.Resource) bool {
 	log.Logger().Debug("Increasing resource usage",
+		zap.Int("tracking type", int(trackType)),
 		zap.String("queue path", queuePath),
 		zap.String("application", applicationID),
 		zap.Stringer("resource", usage))
 	if queuePath == "" || applicationID == "" || usage == nil {
-		return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s",
-			queuePath, applicationID, usage.String())
+		return false
+	}
+
+	finalResourceUsage := qt.resourceUsage.Clone()
+	finalResourceUsage.AddTo(usage)
+	wildCardQuotaExceeded := false
+

Review Comment:
   is this a new app to add to the list of running applications?
   ```
   existingApp := runningApplications[appId]
   ```
   Only check maxRunningApps within range if `existingApp == false`



##########
pkg/scheduler/ugm/queue_tracker.go:
##########
@@ -31,38 +31,141 @@ import (
 
 type QueueTracker struct {
 	queueName           string
+	parent              *QueueTracker
+	queuePath           string
 	resourceUsage       *resources.Resource
 	runningApplications map[string]bool
-	maxResourceUsage    *resources.Resource
+	maxResources        *resources.Resource
 	maxRunningApps      uint64
 	childQueueTrackers  map[string]*QueueTracker
 }
 
 func newRootQueueTracker() *QueueTracker {
-	return newQueueTracker(configs.RootQueue)
+	qt := newQueueTracker(configs.RootQueue, nil)
+	return qt
 }
 
-func newQueueTracker(queueName string) *QueueTracker {
-	log.Logger().Debug("Creating queue tracker object for queue",
-		zap.String("queue", queueName))
+func newQueueTracker(queueName string, parent *QueueTracker) *QueueTracker {
 	queueTracker := &QueueTracker{
 		queueName:           queueName,
+		parent:              parent,
 		resourceUsage:       resources.NewResource(),
 		runningApplications: make(map[string]bool),
+		maxResources:        resources.NewResource(),
+		maxRunningApps:      0,
 		childQueueTrackers:  make(map[string]*QueueTracker),
 	}
+	parentQueueName := ""
+	if parent != nil {
+		queueTracker.queuePath = queueTracker.parent.queuePath + configs.DOT + queueName
+		parentQueueName = parent.queueName
+	} else if queueName == configs.RootQueue {
+		queueTracker.queuePath = configs.RootQueue
+	}
+	log.Logger().Debug("Created queue tracker object for queue",
+		zap.String("queue", queueName),
+		zap.String("parent", parentQueueName),
+		zap.String("queue path ", queueTracker.queuePath))
 	return queueTracker
 }
 
-func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource) error {
+type trackingType int
+
+const (
+	none trackingType = iota
+	user
+	group
+)
+
+func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, trackType trackingType, usage *resources.Resource) bool {
 	log.Logger().Debug("Increasing resource usage",
+		zap.Int("tracking type", int(trackType)),
 		zap.String("queue path", queuePath),
 		zap.String("application", applicationID),
 		zap.Stringer("resource", usage))
 	if queuePath == "" || applicationID == "" || usage == nil {
-		return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s",
-			queuePath, applicationID, usage.String())
+		return false
+	}
+
+	finalResourceUsage := qt.resourceUsage.Clone()
+	finalResourceUsage.AddTo(usage)
+	wildCardQuotaExceeded := false
+
+	// Are there any settings for specific user/group? If not, try wild card settings
+	if int(qt.maxRunningApps) == 0 && resources.Equals(resources.NewResource(), qt.maxResources) {
+		// Is there any wild card settings? Do we need to apply enforcement checks using wild card limit settings?
+		var config map[string]interface{}
+		if trackType == user {
+			config = m.getUserWildCardLimitsConfig(qt.queuePath)
+		} else if trackType == group {
+			config = m.getGroupWildCardLimitsConfig(qt.queuePath)
+		}
+		if config != nil {
+			var maxResources map[string]string
+			var maxApplications uint64
+			var ok bool
+			if maxApplications, ok = config[maxapplications].(uint64); !ok {
+				log.Logger().Debug("Problem in using the wild card limit max application settings.",
+					zap.String("queue path", queuePath),
+					zap.Int("tracking type", int(trackType)),
+					zap.Any("max applications ", config[maxapplications]))
+			}
+			if maxResources, ok = config[maxresources].(map[string]string); !ok {
+				log.Logger().Debug("Problem in using the wild card limit max resources settings.",
+					zap.String("queue path", queuePath),
+					zap.Int("tracking type", int(trackType)),
+					zap.Any("max applications ", config[maxresources]))
+			}
+			var maxResource *resources.Resource
+			var err error
+			if maxResource, err = resources.NewResourceFromConf(maxResources); err != nil {
+				log.Logger().Debug("Problem in using the wild card limit max resources settings.",
+					zap.String("queue path", queuePath),
+					zap.String("max resources after casting", maxResource.String()),
+					zap.Error(err))
+				return false
+			}
+			wildCardQuotaExceeded = (maxApplications != 0 && len(qt.runningApplications)+1 > int(maxApplications)) || (!resources.Equals(resources.NewResource(), maxResource) && resources.StrictlyGreaterThan(finalResourceUsage, maxResource))
+			log.Logger().Debug("using wild card limit settings as user/group specific limit settings not set",
+				zap.Int("tracking type", int(trackType)),
+				zap.String("queue path", queuePath),
+				zap.Uint64("wild card max running apps", maxApplications),
+				zap.String("wild card max resources", maxResource.String()),
+				zap.Bool("wild card quota exceeded", wildCardQuotaExceeded))
+		}
+	}
+
+	// apply user/group specific limit settings set if configured, otherwise use wild card limit settings
+	if (int(qt.maxRunningApps) != 0 && !resources.Equals(resources.NewResource(), qt.maxResources)) || wildCardQuotaExceeded {

Review Comment:
   no casting to int just use the uint64



##########
pkg/scheduler/ugm/queue_tracker.go:
##########
@@ -31,38 +31,141 @@ import (
 
 type QueueTracker struct {
 	queueName           string
+	parent              *QueueTracker
+	queuePath           string
 	resourceUsage       *resources.Resource
 	runningApplications map[string]bool
-	maxResourceUsage    *resources.Resource
+	maxResources        *resources.Resource
 	maxRunningApps      uint64
 	childQueueTrackers  map[string]*QueueTracker
 }
 
 func newRootQueueTracker() *QueueTracker {
-	return newQueueTracker(configs.RootQueue)
+	qt := newQueueTracker(configs.RootQueue, nil)
+	return qt
 }
 
-func newQueueTracker(queueName string) *QueueTracker {
-	log.Logger().Debug("Creating queue tracker object for queue",
-		zap.String("queue", queueName))
+func newQueueTracker(queueName string, parent *QueueTracker) *QueueTracker {
 	queueTracker := &QueueTracker{
 		queueName:           queueName,
+		parent:              parent,
 		resourceUsage:       resources.NewResource(),
 		runningApplications: make(map[string]bool),
+		maxResources:        resources.NewResource(),
+		maxRunningApps:      0,
 		childQueueTrackers:  make(map[string]*QueueTracker),
 	}
+	parentQueueName := ""
+	if parent != nil {
+		queueTracker.queuePath = queueTracker.parent.queuePath + configs.DOT + queueName
+		parentQueueName = parent.queueName
+	} else if queueName == configs.RootQueue {
+		queueTracker.queuePath = configs.RootQueue
+	}
+	log.Logger().Debug("Created queue tracker object for queue",
+		zap.String("queue", queueName),
+		zap.String("parent", parentQueueName),
+		zap.String("queue path ", queueTracker.queuePath))
 	return queueTracker
 }
 
-func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource) error {
+type trackingType int
+
+const (
+	none trackingType = iota
+	user
+	group
+)
+
+func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, trackType trackingType, usage *resources.Resource) bool {
 	log.Logger().Debug("Increasing resource usage",
+		zap.Int("tracking type", int(trackType)),
 		zap.String("queue path", queuePath),
 		zap.String("application", applicationID),
 		zap.Stringer("resource", usage))
 	if queuePath == "" || applicationID == "" || usage == nil {
-		return fmt.Errorf("mandatory parameters are missing. queuepath: %s, application id: %s, resource usage: %s",
-			queuePath, applicationID, usage.String())
+		return false
+	}
+
+	finalResourceUsage := qt.resourceUsage.Clone()
+	finalResourceUsage.AddTo(usage)
+	wildCardQuotaExceeded := false
+
+	// Are there any settings for specific user/group? If not, try wild card settings
+	if int(qt.maxRunningApps) == 0 && resources.Equals(resources.NewResource(), qt.maxResources) {
+		// Is there any wild card settings? Do we need to apply enforcement checks using wild card limit settings?
+		var config map[string]interface{}
+		if trackType == user {
+			config = m.getUserWildCardLimitsConfig(qt.queuePath)
+		} else if trackType == group {
+			config = m.getGroupWildCardLimitsConfig(qt.queuePath)
+		}
+		if config != nil {
+			var maxResources map[string]string
+			var maxApplications uint64
+			var ok bool
+			if maxApplications, ok = config[maxapplications].(uint64); !ok {
+				log.Logger().Debug("Problem in using the wild card limit max application settings.",
+					zap.String("queue path", queuePath),
+					zap.Int("tracking type", int(trackType)),
+					zap.Any("max applications ", config[maxapplications]))
+			}
+			if maxResources, ok = config[maxresources].(map[string]string); !ok {
+				log.Logger().Debug("Problem in using the wild card limit max resources settings.",
+					zap.String("queue path", queuePath),
+					zap.Int("tracking type", int(trackType)),
+					zap.Any("max applications ", config[maxresources]))
+			}
+			var maxResource *resources.Resource
+			var err error
+			if maxResource, err = resources.NewResourceFromConf(maxResources); err != nil {
+				log.Logger().Debug("Problem in using the wild card limit max resources settings.",
+					zap.String("queue path", queuePath),
+					zap.String("max resources after casting", maxResource.String()),
+					zap.Error(err))
+				return false
+			}
+			wildCardQuotaExceeded = (maxApplications != 0 && len(qt.runningApplications)+1 > int(maxApplications)) || (!resources.Equals(resources.NewResource(), maxResource) && resources.StrictlyGreaterThan(finalResourceUsage, maxResource))
+			log.Logger().Debug("using wild card limit settings as user/group specific limit settings not set",
+				zap.Int("tracking type", int(trackType)),
+				zap.String("queue path", queuePath),
+				zap.Uint64("wild card max running apps", maxApplications),
+				zap.String("wild card max resources", maxResource.String()),
+				zap.Bool("wild card quota exceeded", wildCardQuotaExceeded))
+		}
+	}
+
+	// apply user/group specific limit settings set if configured, otherwise use wild card limit settings
+	if (int(qt.maxRunningApps) != 0 && !resources.Equals(resources.NewResource(), qt.maxResources)) || wildCardQuotaExceeded {
+		log.Logger().Debug("applying enforcement checks using limit settings",
+			zap.Int("tracking type", int(trackType)),
+			zap.String("queue path", queuePath),
+			zap.Uint64("max running apps", qt.maxRunningApps),
+			zap.String("max resources", qt.maxResources.String()),
+			zap.Bool("wild card quota exceeded", wildCardQuotaExceeded))
+		if (len(qt.runningApplications)+1 > int(qt.maxRunningApps) || resources.StrictlyGreaterThan(finalResourceUsage, qt.maxResources)) || wildCardQuotaExceeded {
+			log.Logger().Warn("Unable to increase resource usage as allowing new application to run would exceed either configured max applications or max resources limit of specific user/group or wild card user/group",
+				zap.String("queue path", queuePath),
+				zap.Int("tracking type", int(trackType)),
+				zap.Int("current running applications", len(qt.runningApplications)),
+				zap.Int("max running applications", int(qt.maxRunningApps)),
+				zap.String("current resource usage", qt.resourceUsage.String()),
+				zap.String("max resource usage", qt.maxResources.String()))
+			return false
+		}
+	}
+
+	childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
+	if childQueuePath != "" {
+		if qt.childQueueTrackers[immediateChildQueueName] == nil {
+			qt.childQueueTrackers[immediateChildQueueName] = newQueueTracker(immediateChildQueueName, qt)
+		}
+		result := qt.childQueueTrackers[immediateChildQueueName].increaseTrackedResource(childQueuePath, applicationID, trackType, usage)
+		if !result {
+			return false
+		}

Review Comment:
   use
   ```
   if allowed := qt....; !allowed {
    return allowed
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org