You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by "craigcondit (via GitHub)" <gi...@apache.org> on 2023/02/24 17:27:47 UTC

[GitHub] [yunikorn-core] craigcondit opened a new pull request, #511: [YUNIKORN-1467] [WIP] Implement priority-based victim selection algorithm

craigcondit opened a new pull request, #511:
URL: https://github.com/apache/yunikorn-core/pull/511

   - Update to latest SI
   - Implement getGuaranteedHeadRoom()
   - Rename tryPreemption() -> tryRequiredNodePreemption()
   - Update ask with both allowPreemptSelf / allowPreemptOther policies
   - Remove preemption from partition conf
   - Track last attempted preemption time on ask
   - Walk preemption fence subtree to find potential victims
   - Filter out allocations which don't help meet shortfall
   - Filter out allocations which would reduce queue below guaranteed amounts
   - Filter out nodes which have no allocations or cannot satisfy request
   - Sort nodes by number of tasks which can be preempted
   - Call shim to check predicates
   - Limit number of preemption attempts to 10 requests per queue
   - Revert YUNIKORN-1465 as we no longer need per-app preemption priorities
   - Revert YUNIKORN-1466 as we no longer need per-queue preemption priorities
   
   TODO:
   - Unit testing (minimal unit tests present, not yet complete)
   - E2E testing in shim (not started)
   - Queue doesn't yet track resources for in-flight preemptions. This is necessary to avoid race conditions as we might preempt too much in a queue.
   - Currently, preemption will only be triggered if there are no nodes capable of running a task. It will NOT be triggered in the case where only queue limits apply, as the initial check that a task is within limits prior to scheduling can't detect things like a sibling queue that is over guaranteed limits eating up a parent's max resources. Preemption should be allowed in this case, but the current node-centric algorithm doesn't handle this. We need some more design work on this scenario.
   
   Issues:
   - It appears that we get a reservation for a node even though that node is not a fit, resulting in the node being excluded from consideration due to the node iterator skipping reserved nodes.
   - Configuration currently disallows parent guaranteed limits to be below the sum of child queue guarantees. Is this something we want to keep?
   
   ### What type of PR is it?
   * [ ] - Bug Fix
   * [ ] - Improvement
   * [x] - Feature
   * [ ] - Documentation
   * [ ] - Hot Fix
   * [ ] - Refactoring
   
   ### Todos
   * [ ] - Task
   
   ### What is the Jira issue?
   https://issues.apache.org/jira/browse/YUNIKORN-1467
   
   ### How should this be tested?
   TODO 
   
   ### Screenshots (if appropriate)
   
   ### Questions:
   * [ ] - The licenses files need update.
   * [ ] - There is breaking changes for older versions.
   * [ ] - It needs documentation.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1149584303


##########
pkg/scheduler/partition_test.go:
##########
@@ -123,7 +123,6 @@ func TestNewWithPlacement(t *testing.T) {
 			},

Review Comment:
   Added preemption-centric test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147490235


##########
pkg/metrics/queue.go:
##########
@@ -131,3 +131,11 @@ func (m *QueueMetrics) SetQueuePendingResourceMetrics(resourceName string, value
 func (m *QueueMetrics) AddQueuePendingResourceMetrics(resourceName string, value float64) {
 	m.ResourceMetrics.With(prometheus.Labels{"state": "pending", "resource": resourceName}).Add(value)
 }
+
+func (m *QueueMetrics) SetQueuePreemptingResourceMetrics(resourceName string, value float64) {
+	m.ResourceMetrics.With(prometheus.Labels{"state": "preempting", "resource": resourceName}).Set(value)
+}
+
+func (m *QueueMetrics) AddQueuePreemptingResourceMetrics(resourceName string, value float64) {

Review Comment:
   This method doesn't seem to be called from anywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147595388


##########
pkg/scheduler/objects/queue.go:
##########
@@ -41,29 +40,32 @@ import (
 	"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
 
+var (
+	maxPreemptionsPerQueue = 10 // maximum number of asks to attempt to preempt for in a single queue
+)
+
 // Queue structure inside Scheduler
 type Queue struct {
 	QueuePath string // Fully qualified path for the queue
 	Name      string // Queue name as in the config etc.
 
 	// Private fields need protection
-	sortType                  policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
-	children                  map[string]*Queue         // Only for direct children, parent queue only
-	childPriorities           map[string]int32          // cached priorities for child queues
-	childPreemptionPriorities map[string]int32          // cached preemption priorities for child queues
-	applications              map[string]*Application   // only for leaf queue
-	appPriorities             map[string]int32          // cached priorities for application
-	appPreemptionPriorities   map[string]int32          // cached preemption priorities for application
-	reservedApps              map[string]int            // applications reserved within this queue, with reservation count
-	parent                    *Queue                    // link back to the parent in the scheduler
-	pending                   *resources.Resource       // pending resource for the apps in the queue
-	prioritySortEnabled       bool                      // whether priority is used for request sorting
-	priorityPolicy            policies.PriorityPolicy   // priority policy
-	priorityOffset            int32                     // priority offset for this queue relative to others
-	preemptionPolicy          policies.PreemptionPolicy // preemption policy
-	preemptionDelay           time.Duration             // time before preemption is considered
-	currentPriority           int32                     // the current scheduling priority of this queue
-	preemptionPriority        int32                     // the current preemption priority of this queue
+	sortType            policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
+	children            map[string]*Queue         // Only for direct children, parent queue only
+	childPriorities     map[string]int32          // cached priorities for child queues
+	applications        map[string]*Application   // only for leaf queue
+	appPriorities       map[string]int32          // cached priorities for application
+	reservedApps        map[string]int            // applications reserved within this queue, with reservation count
+	parent              *Queue                    // link back to the parent in the scheduler
+	pending             *resources.Resource       // pending resource for the apps in the queue
+	allocatedResource   *resources.Resource       // allocated resource for the apps in the queue
+	preemptingResource  *resources.Resource       // preempting resource for the apps in the queue

Review Comment:
   The thing I'd really like to see tested (possibly from a future test case in `partition_test.go`) is this resource tracking. So, after a completed preemption, this should got back to zero. Not sure if we have something similar to `allocatedResource`, but for this, I think it's a great to have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#issuecomment-1486622720

   >
   > While going through these big chunk of changes, felt that there is a lot of room for improvement from code simplification perspective. Since effort is big, it is not possible to expect it out from the first cut itself, but I do expect that it will happen over the future iterations. 
   
   The code is optimized for efficiency as we take great pains to avoid expensive computations until later in the cycle. This code is very much on the hot path of the scheduler so we make every attempt to abort early if preemption cannot be done. Combining the methods you cited would require more work to be done upfront and very likely make the entire process less performant -- remember that the normal case is that we do NOT find a preemption solution. Preemption itself is rare and so we want to short-circuit out as soon as we can, even at the expense of a preemption taking longer. 
   
   > For example, findEligiblePreemptionVictims, calculateAdditionalVictims and tryNodes methods can be refactored to simplify it further to possibly finalizing the victims as early as possible rather than taking some of the decisions at later stages and rejecting it later?
   
   This is by design as we cannot know until predicate checks are run which solutions (if any) are viable. We gather all the possible victims in each node first as we might need to check more pods than expected due to predicate constraints. 
   
   > To avoid redundant checks (seems like? but need to go in detail to confirm this), etc thereby increase the overall efficiency of preemption cycle.
   
   The checks may seem redundant but this is to give us every opportunity to short-circuit out early. They are structured to avoid doing expensive computations early in the process. 
   
   > Is there a way to avoid doing preemptPredicateCheck for all batches? Can we avoid it doing for rest of the batches if we are done with the current batch being processed?
   
   We already do this. Once a batch is processed we evaluate the solutions found (if any) and stop if a preemption solution that is "good enough" is discovered. We only continue if there were no optimal solutions found. 
   
   Additionally to avoid repeated expensive computations, we only ever try to preempt for a single pod once every 15 seconds, and only check a maximum of 10 pods in any one queue. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147595388


##########
pkg/scheduler/objects/queue.go:
##########
@@ -41,29 +40,32 @@ import (
 	"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
 
+var (
+	maxPreemptionsPerQueue = 10 // maximum number of asks to attempt to preempt for in a single queue
+)
+
 // Queue structure inside Scheduler
 type Queue struct {
 	QueuePath string // Fully qualified path for the queue
 	Name      string // Queue name as in the config etc.
 
 	// Private fields need protection
-	sortType                  policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
-	children                  map[string]*Queue         // Only for direct children, parent queue only
-	childPriorities           map[string]int32          // cached priorities for child queues
-	childPreemptionPriorities map[string]int32          // cached preemption priorities for child queues
-	applications              map[string]*Application   // only for leaf queue
-	appPriorities             map[string]int32          // cached priorities for application
-	appPreemptionPriorities   map[string]int32          // cached preemption priorities for application
-	reservedApps              map[string]int            // applications reserved within this queue, with reservation count
-	parent                    *Queue                    // link back to the parent in the scheduler
-	pending                   *resources.Resource       // pending resource for the apps in the queue
-	prioritySortEnabled       bool                      // whether priority is used for request sorting
-	priorityPolicy            policies.PriorityPolicy   // priority policy
-	priorityOffset            int32                     // priority offset for this queue relative to others
-	preemptionPolicy          policies.PreemptionPolicy // preemption policy
-	preemptionDelay           time.Duration             // time before preemption is considered
-	currentPriority           int32                     // the current scheduling priority of this queue
-	preemptionPriority        int32                     // the current preemption priority of this queue
+	sortType            policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
+	children            map[string]*Queue         // Only for direct children, parent queue only
+	childPriorities     map[string]int32          // cached priorities for child queues
+	applications        map[string]*Application   // only for leaf queue
+	appPriorities       map[string]int32          // cached priorities for application
+	reservedApps        map[string]int            // applications reserved within this queue, with reservation count
+	parent              *Queue                    // link back to the parent in the scheduler
+	pending             *resources.Resource       // pending resource for the apps in the queue
+	allocatedResource   *resources.Resource       // allocated resource for the apps in the queue
+	preemptingResource  *resources.Resource       // preempting resource for the apps in the queue

Review Comment:
   The thing I'd really like to see tested (possibly inside a future test case in `partition_test.go`) is this resource tracking. So, after a completed preemption, this should got back to zero. Not sure if we have something similar which checks `allocatedResource`, but for this, I think it's a great to have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1148074481


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35
+)
+
+// Preemptor encapsulates the functionality required for preemption victim selection
+type Preemptor struct {
+	application     *Application        // application containing ask
+	queue           *Queue              // queue to preempt for
+	queuePath       string              // path of queue to preempt for
+	headRoom        *resources.Resource // current queue headroom
+	preemptionDelay time.Duration       // preemption delay
+	ask             *AllocationAsk      // ask to be preempted for
+	iterator        NodeIterator        // iterator to enumerate all nodes
+	nodesTried      bool                // flag indicating that scheduling has already been tried on all nodes
+
+	// lazily-populated work structures
+	allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
+	queueByAlloc       map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationID
+	allocationsByNode  map[string][]*Allocation            // map of allocation by nodeID
+	nodeAvailableMap   map[string]*resources.Resource      // map of available resources by nodeID
+}
+
+// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
+type QueuePreemptionSnapshot struct {
+	Parent             *QueuePreemptionSnapshot // snapshot of parent queue
+	QueuePath          string                   // fully qualified path to queue
+	AllocatedResource  *resources.Resource      // allocated resources
+	PreemptingResource *resources.Resource      // resources currently flagged for preemption
+	MaxResource        *resources.Resource      // maximum resources for this queue
+	GuaranteedResource *resources.Resource      // guaranteed resources for this queue
+	PotentialVictims   []*Allocation            // list of allocations which could be preempted
+}
+
+// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.
+func NewPreemptor(application *Application, headRoom *resources.Resource, preemptionDelay time.Duration, ask *AllocationAsk, iterator NodeIterator, nodesTried bool) *Preemptor {
+	return &Preemptor{
+		application:     application,
+		queue:           application.queue,
+		queuePath:       application.queuePath,
+		headRoom:        headRoom,
+		preemptionDelay: preemptionDelay,
+		ask:             ask,
+		iterator:        iterator,
+		nodesTried:      nodesTried,
+	}
+}
+
+// CheckPreconditions performs simple sanity checks designed to determine if preemption should be attempted
+// for an ask. If checks succeed, updates the ask preemption check time.
+func (p *Preemptor) CheckPreconditions() bool {
+	now := time.Now()
+
+	// skip if ask is not allowed to preempt other tasks
+	if !p.ask.IsAllowPreemptOther() {
+		return false
+	}
+
+	// skip if ask has previously triggered preemption
+	if p.ask.HasTriggeredPreemption() {
+		return false
+	}
+
+	// skip if ask requires a specific node (this should be handled by required node preemption algorithm)
+	if p.ask.GetRequiredNode() != "" {
+		return false
+	}
+
+	// skip if preemption delay has not yet passed
+	if now.Before(p.ask.GetCreateTime().Add(p.preemptionDelay)) {
+		return false
+	}
+
+	// skip if attempt frequency hasn't been reached again
+	if now.Before(p.ask.GetPreemptCheckTime().Add(preemptAttemptFrequency)) {
+		return false
+	}
+
+	// mark this ask as having been checked recently to avoid doing extra work in the next scheduling cycle
+	p.ask.UpdatePreemptCheckTime()
+
+	return true
+}
+
+// initQueueSnapshots ensures that snapshots have been taken of the queue
+func (p *Preemptor) initQueueSnapshots() {
+	if p.allocationsByQueue != nil {
+		return
+	}
+
+	p.allocationsByQueue = p.queue.FindEligiblePreemptionVictims(p.queuePath, p.ask)
+}
+
+// initWorkingState builds helper data structures required to compute a solution
+func (p *Preemptor) initWorkingState() {
+	// return if we have already run
+	if p.nodeAvailableMap != nil {
+		return
+	}
+
+	// ensure queue snapshots are populated
+	p.initQueueSnapshots()
+
+	allocationsByNode := make(map[string][]*Allocation)
+	queueByAlloc := make(map[string]*QueuePreemptionSnapshot)
+	nodeAvailableMap := make(map[string]*resources.Resource)
+
+	// build a map from NodeID to allocation and from allocationID to queue capacities
+	for _, victims := range p.allocationsByQueue {
+		for _, allocation := range victims.PotentialVictims {
+			nodeID := allocation.GetNodeID()
+			allocations, ok := allocationsByNode[nodeID]
+			if !ok {
+				allocations = make([]*Allocation, 0)
+			}
+			allocationsByNode[nodeID] = append(allocations, allocation)
+			queueByAlloc[allocation.GetAllocationKey()] = victims
+		}
+	}
+
+	// walk node iterator and track available resources per node
+	for p.iterator.HasNext() {
+		node := p.iterator.Next()
+		if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForApp(reservationKey(nil, p.application, p.ask))) {
+			// node is not available, remove any potential victims from consideration
+			delete(allocationsByNode, node.NodeID)
+		} else {
+			// track allocated and available resources
+			nodeAvailableMap[node.NodeID] = node.GetAvailableResource()
+		}
+	}
+
+	// sort the allocations on each node in the order we'd like to try them
+	sortVictimsForPreemption(allocationsByNode)
+
+	p.allocationsByNode = allocationsByNode
+	p.queueByAlloc = queueByAlloc
+	p.nodeAvailableMap = nodeAvailableMap
+}
+
+// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
+func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
+	p.initQueueSnapshots()
+
+	queues := p.duplicateQueueSnapshots()
+	currentQueue, ok := queues[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Didn't find current queue in snapshot list",
+			zap.String("queuePath", p.queuePath))
+		return false
+	}
+
+	currentQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
+	for _, snapshot := range queues {
+		for _, alloc := range snapshot.PotentialVictims {
+			snapshot.RemoveAllocation(alloc.GetAllocatedResource())
+			if currentQueue.IsWithinGuaranteedResource() {
+				return true
+			}
+		}
+	}
+
+	return false
+}
+
+// calculateVictimsByNode takes a list of potential victims for a node and builds a list ready for the RM to process.
+// Result is a list of allocations and the starting index to check for the initial preemption list.
+// If the result is nil, the node should not be considered for preemption.
+func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
+	nodeCurrentAvailable := nodeAvailable.Clone()
+	allocationsByQueueSnap := p.duplicateQueueSnapshots()
+
+	// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
+	// to queue limits and not node resource limits.
+	if resources.FitIn(nodeCurrentAvailable, p.ask.GetAllocatedResource()) {
+		// return empty list so this node is considered for preemption
+		return -1, make([]*Allocation, 0)
+	}
+
+	// get the current queue snapshot
+	askQueue, ok := allocationsByQueueSnap[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
+	// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
+	// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
+	// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
+	// capacity and the queue guaranteed headroom.
+	head := make([]*Allocation, 0)
+	tail := make([]*Allocation, 0)
+	for _, victim := range potentialVictims {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				// did removing this allocation still keep the queue over-allocated?
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// check to see if the shortfall on the node has changed
+					shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
+					newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
+					newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
+					if resources.EqualsOrEmpty(shortfall, newShortfall) {
+						// shortfall did not change, so task should only be considered as a last resort
+						queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+						tail = append(tail, victim)
+					} else {
+						// shortfall was decreased, so we should keep this task on the main list and adjust usage
+						nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+						head = append(head, victim)
+					}
+				} else {
+					// removing this allocation would have reduced queue below guaranteed limits, put it back
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+	// merge lists
+	head = append(head, tail...)
+	if len(head) == 0 {
+		return -1, nil
+	}
+
+	// clone again
+	nodeCurrentAvailable = nodeAvailable.Clone()
+	allocationsByQueueSnap = p.duplicateQueueSnapshots()
+
+	// get the current queue snapshot
+	askQueue, ok2 := allocationsByQueueSnap[p.queuePath]
+	if !ok2 {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
+	// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
+	// both the node available capacity and the queue headroom. Save the Index within the results of the first task
+	// which would reduce the shortfall to zero.
+	results := make([]*Allocation, 0)
+	index := -1
+	for _, victim := range head {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// removing task does not violate queue constraints, adjust queue and node
+					nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+					// check if ask now fits and we haven't had this happen before
+					if nodeCurrentAvailable.FitInMaxUndef(p.ask.GetAllocatedResource()) && index < 0 {
+						index = len(results)
+					}
+					// add victim to results
+					results = append(results, victim)
+				} else {
+					// add back resources
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+
+	// check to see if enough resources were freed
+	if index < 0 {
+		return -1, nil
+	}
+
+	return index, results
+}
+
+func (p *Preemptor) duplicateQueueSnapshots() map[string]*QueuePreemptionSnapshot {
+	cache := make(map[string]*QueuePreemptionSnapshot, 0)
+	for _, snapshot := range p.allocationsByQueue {
+		snapshot.Duplicate(cache)
+	}
+	return cache
+}
+
+// checkPreemptionPredicates calls the shim via the SI to evaluate nodes for preemption
+func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
+	// don't process empty list
+	if len(predicateChecks) == 0 {
+		return nil
+	}
+
+	// sort predicate checks by number of expected preempted tasks
+	sort.SliceStable(predicateChecks, func(i int, j int) bool {
+		return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
+	})
+
+	// check for RM callback
+	plugin := plugins.GetResourceManagerCallbackPlugin()
+	if plugin == nil {
+		// if a plugin isn't registered, assume checks will succeed and synthesize a result
+		check := predicateChecks[0]
+		log.Logger().Debug("No RM callback plugin registered, using first selected node for preemption",
+			zap.String("NodeID", check.NodeID),
+			zap.String("AllocationKey", check.AllocationKey))
+
+		result := &predicateCheckResult{
+			allocationKey: check.AllocationKey,
+			nodeID:        check.NodeID,
+			success:       true,
+			index:         int(check.StartIndex),
+		}
+		result.populateVictims(victimsByNode)
+		return result
+	}
+
+	// process each batch of checks by sending to the RM
+	batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
+	var bestResult *predicateCheckResult = nil
+	for _, batch := range batches {
+		var wg sync.WaitGroup
+		ch := make(chan *predicateCheckResult, len(batch))
+		expected := 0
+		for _, args := range batch {
+			// add goroutine for checking preemption
+			wg.Add(1)
+			expected++
+			go preemptPredicateCheck(plugin, ch, &wg, args)
+		}
+		// wait for completion and close channel
+		go func() {
+			wg.Wait()
+			close(ch)
+		}()
+		for result := range ch {
+			// if result is successful, keep track of it
+			if result.success {
+				if bestResult == nil {
+					bestResult = result
+				} else if result.betterThan(bestResult, p.allocationsByNode) {
+					bestResult = result

Review Comment:
   Added additional test in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1149584594


##########
pkg/scheduler/objects/queue.go:
##########
@@ -41,29 +40,32 @@ import (
 	"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
 
+var (
+	maxPreemptionsPerQueue = 10 // maximum number of asks to attempt to preempt for in a single queue
+)
+
 // Queue structure inside Scheduler
 type Queue struct {
 	QueuePath string // Fully qualified path for the queue
 	Name      string // Queue name as in the config etc.
 
 	// Private fields need protection
-	sortType                  policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
-	children                  map[string]*Queue         // Only for direct children, parent queue only
-	childPriorities           map[string]int32          // cached priorities for child queues
-	childPreemptionPriorities map[string]int32          // cached preemption priorities for child queues
-	applications              map[string]*Application   // only for leaf queue
-	appPriorities             map[string]int32          // cached priorities for application
-	appPreemptionPriorities   map[string]int32          // cached preemption priorities for application
-	reservedApps              map[string]int            // applications reserved within this queue, with reservation count
-	parent                    *Queue                    // link back to the parent in the scheduler
-	pending                   *resources.Resource       // pending resource for the apps in the queue
-	prioritySortEnabled       bool                      // whether priority is used for request sorting
-	priorityPolicy            policies.PriorityPolicy   // priority policy
-	priorityOffset            int32                     // priority offset for this queue relative to others
-	preemptionPolicy          policies.PreemptionPolicy // preemption policy
-	preemptionDelay           time.Duration             // time before preemption is considered
-	currentPriority           int32                     // the current scheduling priority of this queue
-	preemptionPriority        int32                     // the current preemption priority of this queue
+	sortType            policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
+	children            map[string]*Queue         // Only for direct children, parent queue only
+	childPriorities     map[string]int32          // cached priorities for child queues
+	applications        map[string]*Application   // only for leaf queue
+	appPriorities       map[string]int32          // cached priorities for application
+	reservedApps        map[string]int            // applications reserved within this queue, with reservation count
+	parent              *Queue                    // link back to the parent in the scheduler
+	pending             *resources.Resource       // pending resource for the apps in the queue
+	allocatedResource   *resources.Resource       // allocated resource for the apps in the queue
+	preemptingResource  *resources.Resource       // preempting resource for the apps in the queue

Review Comment:
   Added preemption-centric test with checks on preempting resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147401138


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35
+)
+
+// Preemptor encapsulates the functionality required for preemption victim selection
+type Preemptor struct {
+	application     *Application        // application containing ask
+	queue           *Queue              // queue to preempt for
+	queuePath       string              // path of queue to preempt for
+	headRoom        *resources.Resource // current queue headroom
+	preemptionDelay time.Duration       // preemption delay
+	ask             *AllocationAsk      // ask to be preempted for
+	iterator        NodeIterator        // iterator to enumerate all nodes
+	nodesTried      bool                // flag indicating that scheduling has already been tried on all nodes
+
+	// lazily-populated work structures
+	allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
+	queueByAlloc       map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationID
+	allocationsByNode  map[string][]*Allocation            // map of allocation by nodeID
+	nodeAvailableMap   map[string]*resources.Resource      // map of available resources by nodeID
+}
+
+// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
+type QueuePreemptionSnapshot struct {
+	Parent             *QueuePreemptionSnapshot // snapshot of parent queue
+	QueuePath          string                   // fully qualified path to queue
+	AllocatedResource  *resources.Resource      // allocated resources
+	PreemptingResource *resources.Resource      // resources currently flagged for preemption
+	MaxResource        *resources.Resource      // maximum resources for this queue
+	GuaranteedResource *resources.Resource      // guaranteed resources for this queue
+	PotentialVictims   []*Allocation            // list of allocations which could be preempted
+}
+
+// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.

Review Comment:
   Nit: we already have `RequiredNodePreemptor`, could we call this `GenericPreemptor` to emphasize the difference?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1150373753


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35
+)
+
+// Preemptor encapsulates the functionality required for preemption victim selection
+type Preemptor struct {
+	application     *Application        // application containing ask
+	queue           *Queue              // queue to preempt for
+	queuePath       string              // path of queue to preempt for
+	headRoom        *resources.Resource // current queue headroom
+	preemptionDelay time.Duration       // preemption delay
+	ask             *AllocationAsk      // ask to be preempted for
+	iterator        NodeIterator        // iterator to enumerate all nodes
+	nodesTried      bool                // flag indicating that scheduling has already been tried on all nodes
+
+	// lazily-populated work structures
+	allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
+	queueByAlloc       map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationID
+	allocationsByNode  map[string][]*Allocation            // map of allocation by nodeID
+	nodeAvailableMap   map[string]*resources.Resource      // map of available resources by nodeID
+}
+
+// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
+type QueuePreemptionSnapshot struct {
+	Parent             *QueuePreemptionSnapshot // snapshot of parent queue
+	QueuePath          string                   // fully qualified path to queue
+	AllocatedResource  *resources.Resource      // allocated resources
+	PreemptingResource *resources.Resource      // resources currently flagged for preemption
+	MaxResource        *resources.Resource      // maximum resources for this queue
+	GuaranteedResource *resources.Resource      // guaranteed resources for this queue
+	PotentialVictims   []*Allocation            // list of allocations which could be preempted
+}
+
+// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.
+func NewPreemptor(application *Application, headRoom *resources.Resource, preemptionDelay time.Duration, ask *AllocationAsk, iterator NodeIterator, nodesTried bool) *Preemptor {
+	return &Preemptor{
+		application:     application,
+		queue:           application.queue,
+		queuePath:       application.queuePath,
+		headRoom:        headRoom,
+		preemptionDelay: preemptionDelay,
+		ask:             ask,
+		iterator:        iterator,
+		nodesTried:      nodesTried,
+	}
+}
+
+// CheckPreconditions performs simple sanity checks designed to determine if preemption should be attempted
+// for an ask. If checks succeed, updates the ask preemption check time.
+func (p *Preemptor) CheckPreconditions() bool {
+	now := time.Now()
+
+	// skip if ask is not allowed to preempt other tasks
+	if !p.ask.IsAllowPreemptOther() {
+		return false
+	}
+
+	// skip if ask has previously triggered preemption
+	if p.ask.HasTriggeredPreemption() {
+		return false
+	}
+
+	// skip if ask requires a specific node (this should be handled by required node preemption algorithm)
+	if p.ask.GetRequiredNode() != "" {
+		return false
+	}
+
+	// skip if preemption delay has not yet passed
+	if now.Before(p.ask.GetCreateTime().Add(p.preemptionDelay)) {
+		return false
+	}
+
+	// skip if attempt frequency hasn't been reached again
+	if now.Before(p.ask.GetPreemptCheckTime().Add(preemptAttemptFrequency)) {
+		return false
+	}
+
+	// mark this ask as having been checked recently to avoid doing extra work in the next scheduling cycle
+	p.ask.UpdatePreemptCheckTime()
+
+	return true
+}
+
+// initQueueSnapshots ensures that snapshots have been taken of the queue
+func (p *Preemptor) initQueueSnapshots() {
+	if p.allocationsByQueue != nil {
+		return
+	}
+
+	p.allocationsByQueue = p.queue.FindEligiblePreemptionVictims(p.queuePath, p.ask)
+}
+
+// initWorkingState builds helper data structures required to compute a solution
+func (p *Preemptor) initWorkingState() {
+	// return if we have already run
+	if p.nodeAvailableMap != nil {
+		return
+	}
+
+	// ensure queue snapshots are populated
+	p.initQueueSnapshots()
+
+	allocationsByNode := make(map[string][]*Allocation)
+	queueByAlloc := make(map[string]*QueuePreemptionSnapshot)
+	nodeAvailableMap := make(map[string]*resources.Resource)
+
+	// build a map from NodeID to allocation and from allocationID to queue capacities
+	for _, victims := range p.allocationsByQueue {
+		for _, allocation := range victims.PotentialVictims {
+			nodeID := allocation.GetNodeID()
+			allocations, ok := allocationsByNode[nodeID]
+			if !ok {
+				allocations = make([]*Allocation, 0)
+			}
+			allocationsByNode[nodeID] = append(allocations, allocation)
+			queueByAlloc[allocation.GetAllocationKey()] = victims
+		}
+	}
+
+	// walk node iterator and track available resources per node
+	for p.iterator.HasNext() {
+		node := p.iterator.Next()
+		if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForApp(reservationKey(nil, p.application, p.ask))) {
+			// node is not available, remove any potential victims from consideration
+			delete(allocationsByNode, node.NodeID)
+		} else {
+			// track allocated and available resources
+			nodeAvailableMap[node.NodeID] = node.GetAvailableResource()
+		}
+	}
+
+	// sort the allocations on each node in the order we'd like to try them
+	sortVictimsForPreemption(allocationsByNode)
+
+	p.allocationsByNode = allocationsByNode
+	p.queueByAlloc = queueByAlloc
+	p.nodeAvailableMap = nodeAvailableMap
+}
+
+// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
+func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
+	p.initQueueSnapshots()
+
+	queues := p.duplicateQueueSnapshots()
+	currentQueue, ok := queues[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Didn't find current queue in snapshot list",
+			zap.String("queuePath", p.queuePath))
+		return false
+	}
+
+	currentQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
+	for _, snapshot := range queues {
+		for _, alloc := range snapshot.PotentialVictims {
+			snapshot.RemoveAllocation(alloc.GetAllocatedResource())
+			if currentQueue.IsWithinGuaranteedResource() {
+				return true
+			}
+		}
+	}
+
+	return false
+}
+
+// calculateVictimsByNode takes a list of potential victims for a node and builds a list ready for the RM to process.
+// Result is a list of allocations and the starting index to check for the initial preemption list.
+// If the result is nil, the node should not be considered for preemption.
+func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
+	nodeCurrentAvailable := nodeAvailable.Clone()
+	allocationsByQueueSnap := p.duplicateQueueSnapshots()
+
+	// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
+	// to queue limits and not node resource limits.
+	if resources.FitIn(nodeCurrentAvailable, p.ask.GetAllocatedResource()) {
+		// return empty list so this node is considered for preemption
+		return -1, make([]*Allocation, 0)
+	}
+
+	// get the current queue snapshot
+	askQueue, ok := allocationsByQueueSnap[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
+	// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
+	// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
+	// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
+	// capacity and the queue guaranteed headroom.
+	head := make([]*Allocation, 0)
+	tail := make([]*Allocation, 0)
+	for _, victim := range potentialVictims {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				// did removing this allocation still keep the queue over-allocated?
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// check to see if the shortfall on the node has changed
+					shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
+					newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
+					newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
+					if resources.EqualsOrEmpty(shortfall, newShortfall) {
+						// shortfall did not change, so task should only be considered as a last resort
+						queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+						tail = append(tail, victim)
+					} else {
+						// shortfall was decreased, so we should keep this task on the main list and adjust usage
+						nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+						head = append(head, victim)
+					}
+				} else {
+					// removing this allocation would have reduced queue below guaranteed limits, put it back
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+	// merge lists
+	head = append(head, tail...)
+	if len(head) == 0 {
+		return -1, nil
+	}
+
+	// clone again
+	nodeCurrentAvailable = nodeAvailable.Clone()
+	allocationsByQueueSnap = p.duplicateQueueSnapshots()
+
+	// get the current queue snapshot
+	askQueue, ok2 := allocationsByQueueSnap[p.queuePath]
+	if !ok2 {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
+	// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
+	// both the node available capacity and the queue headroom. Save the Index within the results of the first task
+	// which would reduce the shortfall to zero.
+	results := make([]*Allocation, 0)
+	index := -1
+	for _, victim := range head {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// removing task does not violate queue constraints, adjust queue and node
+					nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+					// check if ask now fits and we haven't had this happen before
+					if nodeCurrentAvailable.FitInMaxUndef(p.ask.GetAllocatedResource()) && index < 0 {
+						index = len(results)
+					}
+					// add victim to results
+					results = append(results, victim)
+				} else {
+					// add back resources
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+
+	// check to see if enough resources were freed
+	if index < 0 {
+		return -1, nil
+	}
+
+	return index, results
+}
+
+func (p *Preemptor) duplicateQueueSnapshots() map[string]*QueuePreemptionSnapshot {
+	cache := make(map[string]*QueuePreemptionSnapshot, 0)
+	for _, snapshot := range p.allocationsByQueue {
+		snapshot.Duplicate(cache)
+	}
+	return cache
+}
+
+// checkPreemptionPredicates calls the shim via the SI to evaluate nodes for preemption
+func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
+	// don't process empty list
+	if len(predicateChecks) == 0 {
+		return nil
+	}
+
+	// sort predicate checks by number of expected preempted tasks
+	sort.SliceStable(predicateChecks, func(i int, j int) bool {
+		return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
+	})
+
+	// check for RM callback
+	plugin := plugins.GetResourceManagerCallbackPlugin()
+	if plugin == nil {
+		// if a plugin isn't registered, assume checks will succeed and synthesize a result
+		check := predicateChecks[0]
+		log.Logger().Debug("No RM callback plugin registered, using first selected node for preemption",
+			zap.String("NodeID", check.NodeID),
+			zap.String("AllocationKey", check.AllocationKey))
+
+		result := &predicateCheckResult{
+			allocationKey: check.AllocationKey,
+			nodeID:        check.NodeID,
+			success:       true,
+			index:         int(check.StartIndex),
+		}
+		result.populateVictims(victimsByNode)
+		return result
+	}
+
+	// process each batch of checks by sending to the RM
+	batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
+	var bestResult *predicateCheckResult = nil
+	for _, batch := range batches {
+		var wg sync.WaitGroup
+		ch := make(chan *predicateCheckResult, len(batch))
+		expected := 0
+		for _, args := range batch {
+			// add goroutine for checking preemption
+			wg.Add(1)
+			expected++

Review Comment:
   Yes this is used. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit closed pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit closed pull request #511: [YUNIKORN-1467] Implement core preemption logic
URL: https://github.com/apache/yunikorn-core/pull/511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147679062


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35

Review Comment:
   In go, consts are untyped, and we need these to be uint64 to work properly and avoid unnecessary casts everywhere. As for 1<<32, this leaves a full 32-bits for the priority value to slot into. So things get scored by whether they have opted out of preemption, whether they are the app originator, and finally by priority. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147679909


##########
pkg/metrics/queue.go:
##########
@@ -131,3 +131,11 @@ func (m *QueueMetrics) SetQueuePendingResourceMetrics(resourceName string, value
 func (m *QueueMetrics) AddQueuePendingResourceMetrics(resourceName string, value float64) {
 	m.ResourceMetrics.With(prometheus.Labels{"state": "pending", "resource": resourceName}).Add(value)
 }
+
+func (m *QueueMetrics) SetQueuePreemptingResourceMetrics(resourceName string, value float64) {
+	m.ResourceMetrics.With(prometheus.Labels{"state": "preempting", "resource": resourceName}).Set(value)
+}
+
+func (m *QueueMetrics) AddQueuePreemptingResourceMetrics(resourceName string, value float64) {

Review Comment:
   Correct, it was added for parity with the other metrics methods. It seemed appropriate to match the existing patterns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147586397


##########
pkg/scheduler/partition_test.go:
##########
@@ -123,7 +123,6 @@ func TestNewWithPlacement(t *testing.T) {
 			},

Review Comment:
   Generic comment: we don't have preemption tests from the POV of the partition. Those tests must be written, see `TestPreemptionForRequiredNodeNormalAlloc()`, `TestPreemptionForRequiredNodeReservedAlloc()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147567537


##########
pkg/scheduler/objects/application.go:
##########
@@ -903,8 +901,21 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator fu
 		if sa.canReplace(request) {
 			continue
 		}
-		// resource must fit in headroom otherwise skip the request
+
+		// resource must fit in headroom otherwise skip the request (unless preemption could help)
 		if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
+			// attempt preemption
+			if *preemptAttemptsRemaining > 0 {
+				*preemptAttemptsRemaining--
+				fullIterator := fullNodeIterator()
+				if fullIterator != nil {
+					if alloc, ok := sa.tryPreemption(headRoom, preemptionDelay, request, fullIterator, false); ok {

Review Comment:
   In general is there a way to actually disable preemption completely? Right now this is a new, experimental feature and someone might decide not to use it at all. I was looking at everywhere but I can't find any feature toggle which I think it's important to have, even if it becomes mature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147510086


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35
+)
+
+// Preemptor encapsulates the functionality required for preemption victim selection
+type Preemptor struct {
+	application     *Application        // application containing ask
+	queue           *Queue              // queue to preempt for
+	queuePath       string              // path of queue to preempt for
+	headRoom        *resources.Resource // current queue headroom
+	preemptionDelay time.Duration       // preemption delay
+	ask             *AllocationAsk      // ask to be preempted for
+	iterator        NodeIterator        // iterator to enumerate all nodes
+	nodesTried      bool                // flag indicating that scheduling has already been tried on all nodes
+
+	// lazily-populated work structures
+	allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
+	queueByAlloc       map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationID
+	allocationsByNode  map[string][]*Allocation            // map of allocation by nodeID
+	nodeAvailableMap   map[string]*resources.Resource      // map of available resources by nodeID
+}
+
+// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
+type QueuePreemptionSnapshot struct {
+	Parent             *QueuePreemptionSnapshot // snapshot of parent queue
+	QueuePath          string                   // fully qualified path to queue
+	AllocatedResource  *resources.Resource      // allocated resources
+	PreemptingResource *resources.Resource      // resources currently flagged for preemption
+	MaxResource        *resources.Resource      // maximum resources for this queue
+	GuaranteedResource *resources.Resource      // guaranteed resources for this queue
+	PotentialVictims   []*Allocation            // list of allocations which could be preempted
+}
+
+// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.
+func NewPreemptor(application *Application, headRoom *resources.Resource, preemptionDelay time.Duration, ask *AllocationAsk, iterator NodeIterator, nodesTried bool) *Preemptor {
+	return &Preemptor{
+		application:     application,
+		queue:           application.queue,
+		queuePath:       application.queuePath,
+		headRoom:        headRoom,
+		preemptionDelay: preemptionDelay,
+		ask:             ask,
+		iterator:        iterator,
+		nodesTried:      nodesTried,
+	}
+}
+
+// CheckPreconditions performs simple sanity checks designed to determine if preemption should be attempted
+// for an ask. If checks succeed, updates the ask preemption check time.
+func (p *Preemptor) CheckPreconditions() bool {
+	now := time.Now()
+
+	// skip if ask is not allowed to preempt other tasks
+	if !p.ask.IsAllowPreemptOther() {
+		return false
+	}
+
+	// skip if ask has previously triggered preemption
+	if p.ask.HasTriggeredPreemption() {
+		return false
+	}
+
+	// skip if ask requires a specific node (this should be handled by required node preemption algorithm)
+	if p.ask.GetRequiredNode() != "" {
+		return false
+	}
+
+	// skip if preemption delay has not yet passed
+	if now.Before(p.ask.GetCreateTime().Add(p.preemptionDelay)) {
+		return false
+	}
+
+	// skip if attempt frequency hasn't been reached again
+	if now.Before(p.ask.GetPreemptCheckTime().Add(preemptAttemptFrequency)) {
+		return false
+	}
+
+	// mark this ask as having been checked recently to avoid doing extra work in the next scheduling cycle
+	p.ask.UpdatePreemptCheckTime()
+
+	return true
+}
+
+// initQueueSnapshots ensures that snapshots have been taken of the queue
+func (p *Preemptor) initQueueSnapshots() {
+	if p.allocationsByQueue != nil {
+		return
+	}
+
+	p.allocationsByQueue = p.queue.FindEligiblePreemptionVictims(p.queuePath, p.ask)
+}
+
+// initWorkingState builds helper data structures required to compute a solution
+func (p *Preemptor) initWorkingState() {
+	// return if we have already run
+	if p.nodeAvailableMap != nil {
+		return
+	}
+
+	// ensure queue snapshots are populated
+	p.initQueueSnapshots()
+
+	allocationsByNode := make(map[string][]*Allocation)
+	queueByAlloc := make(map[string]*QueuePreemptionSnapshot)
+	nodeAvailableMap := make(map[string]*resources.Resource)
+
+	// build a map from NodeID to allocation and from allocationID to queue capacities
+	for _, victims := range p.allocationsByQueue {
+		for _, allocation := range victims.PotentialVictims {
+			nodeID := allocation.GetNodeID()
+			allocations, ok := allocationsByNode[nodeID]
+			if !ok {
+				allocations = make([]*Allocation, 0)
+			}
+			allocationsByNode[nodeID] = append(allocations, allocation)
+			queueByAlloc[allocation.GetAllocationKey()] = victims
+		}
+	}
+
+	// walk node iterator and track available resources per node
+	for p.iterator.HasNext() {
+		node := p.iterator.Next()
+		if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForApp(reservationKey(nil, p.application, p.ask))) {
+			// node is not available, remove any potential victims from consideration
+			delete(allocationsByNode, node.NodeID)
+		} else {
+			// track allocated and available resources
+			nodeAvailableMap[node.NodeID] = node.GetAvailableResource()
+		}
+	}
+
+	// sort the allocations on each node in the order we'd like to try them
+	sortVictimsForPreemption(allocationsByNode)
+
+	p.allocationsByNode = allocationsByNode
+	p.queueByAlloc = queueByAlloc
+	p.nodeAvailableMap = nodeAvailableMap
+}
+
+// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
+func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
+	p.initQueueSnapshots()
+
+	queues := p.duplicateQueueSnapshots()
+	currentQueue, ok := queues[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Didn't find current queue in snapshot list",
+			zap.String("queuePath", p.queuePath))
+		return false
+	}
+
+	currentQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
+	for _, snapshot := range queues {
+		for _, alloc := range snapshot.PotentialVictims {
+			snapshot.RemoveAllocation(alloc.GetAllocatedResource())
+			if currentQueue.IsWithinGuaranteedResource() {
+				return true
+			}
+		}
+	}
+
+	return false
+}
+
+// calculateVictimsByNode takes a list of potential victims for a node and builds a list ready for the RM to process.
+// Result is a list of allocations and the starting index to check for the initial preemption list.
+// If the result is nil, the node should not be considered for preemption.
+func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
+	nodeCurrentAvailable := nodeAvailable.Clone()
+	allocationsByQueueSnap := p.duplicateQueueSnapshots()
+
+	// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
+	// to queue limits and not node resource limits.
+	if resources.FitIn(nodeCurrentAvailable, p.ask.GetAllocatedResource()) {
+		// return empty list so this node is considered for preemption
+		return -1, make([]*Allocation, 0)
+	}
+
+	// get the current queue snapshot
+	askQueue, ok := allocationsByQueueSnap[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
+	// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
+	// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
+	// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
+	// capacity and the queue guaranteed headroom.
+	head := make([]*Allocation, 0)
+	tail := make([]*Allocation, 0)
+	for _, victim := range potentialVictims {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				// did removing this allocation still keep the queue over-allocated?
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// check to see if the shortfall on the node has changed
+					shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
+					newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
+					newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
+					if resources.EqualsOrEmpty(shortfall, newShortfall) {
+						// shortfall did not change, so task should only be considered as a last resort
+						queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+						tail = append(tail, victim)
+					} else {
+						// shortfall was decreased, so we should keep this task on the main list and adjust usage
+						nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+						head = append(head, victim)
+					}
+				} else {
+					// removing this allocation would have reduced queue below guaranteed limits, put it back
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+	// merge lists
+	head = append(head, tail...)
+	if len(head) == 0 {
+		return -1, nil
+	}
+
+	// clone again
+	nodeCurrentAvailable = nodeAvailable.Clone()
+	allocationsByQueueSnap = p.duplicateQueueSnapshots()
+
+	// get the current queue snapshot
+	askQueue, ok2 := allocationsByQueueSnap[p.queuePath]
+	if !ok2 {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
+	// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
+	// both the node available capacity and the queue headroom. Save the Index within the results of the first task
+	// which would reduce the shortfall to zero.
+	results := make([]*Allocation, 0)
+	index := -1
+	for _, victim := range head {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// removing task does not violate queue constraints, adjust queue and node
+					nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+					// check if ask now fits and we haven't had this happen before
+					if nodeCurrentAvailable.FitInMaxUndef(p.ask.GetAllocatedResource()) && index < 0 {
+						index = len(results)
+					}
+					// add victim to results
+					results = append(results, victim)
+				} else {
+					// add back resources
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+
+	// check to see if enough resources were freed
+	if index < 0 {
+		return -1, nil
+	}
+
+	return index, results
+}
+
+func (p *Preemptor) duplicateQueueSnapshots() map[string]*QueuePreemptionSnapshot {
+	cache := make(map[string]*QueuePreemptionSnapshot, 0)
+	for _, snapshot := range p.allocationsByQueue {
+		snapshot.Duplicate(cache)
+	}
+	return cache
+}
+
+// checkPreemptionPredicates calls the shim via the SI to evaluate nodes for preemption
+func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
+	// don't process empty list
+	if len(predicateChecks) == 0 {
+		return nil
+	}
+
+	// sort predicate checks by number of expected preempted tasks
+	sort.SliceStable(predicateChecks, func(i int, j int) bool {
+		return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
+	})
+
+	// check for RM callback
+	plugin := plugins.GetResourceManagerCallbackPlugin()
+	if plugin == nil {
+		// if a plugin isn't registered, assume checks will succeed and synthesize a result
+		check := predicateChecks[0]
+		log.Logger().Debug("No RM callback plugin registered, using first selected node for preemption",
+			zap.String("NodeID", check.NodeID),
+			zap.String("AllocationKey", check.AllocationKey))
+
+		result := &predicateCheckResult{
+			allocationKey: check.AllocationKey,
+			nodeID:        check.NodeID,
+			success:       true,
+			index:         int(check.StartIndex),
+		}
+		result.populateVictims(victimsByNode)
+		return result
+	}
+
+	// process each batch of checks by sending to the RM
+	batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
+	var bestResult *predicateCheckResult = nil
+	for _, batch := range batches {
+		var wg sync.WaitGroup
+		ch := make(chan *predicateCheckResult, len(batch))
+		expected := 0
+		for _, args := range batch {
+			// add goroutine for checking preemption
+			wg.Add(1)
+			expected++
+			go preemptPredicateCheck(plugin, ch, &wg, args)
+		}
+		// wait for completion and close channel
+		go func() {
+			wg.Wait()
+			close(ch)
+		}()
+		for result := range ch {
+			// if result is successful, keep track of it
+			if result.success {
+				if bestResult == nil {
+					bestResult = result
+				} else if result.betterThan(bestResult, p.allocationsByNode) {
+					bestResult = result

Review Comment:
   codecov says this branch is not covered, is this relevant to cover?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147675001


##########
pkg/scheduler/objects/application.go:
##########
@@ -961,11 +972,22 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator fu
 
 		iterator := nodeIterator()
 		if iterator != nil {
-			alloc := sa.tryNodes(request, iterator)
-			// have a candidate return it
-			if alloc != nil {
+			if alloc := sa.tryNodes(request, iterator); alloc != nil {
+				// have a candidate return it
 				return alloc
 			}
+
+			// no nodes qualify, attempt preemption
+			if *preemptAttemptsRemaining > 0 {
+				*preemptAttemptsRemaining--
+				fullIterator := fullNodeIterator()
+				if fullIterator != nil {

Review Comment:
   The node iterator will return nil if there are no nodes to iterate. This is simply defensive coding to guard against that case. DIdn't seem worthy of a unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147683124


##########
pkg/scheduler/objects/queue.go:
##########
@@ -41,29 +40,32 @@ import (
 	"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
 
+var (
+	maxPreemptionsPerQueue = 10 // maximum number of asks to attempt to preempt for in a single queue

Review Comment:
   I made it a var because it's possible we may configure it later. I didn't want to do so right away, as I think this will work fine as a default until proven otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] manirajv06 commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "manirajv06 (via GitHub)" <gi...@apache.org>.
manirajv06 commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1150228604


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35
+)
+
+// Preemptor encapsulates the functionality required for preemption victim selection
+type Preemptor struct {
+	application     *Application        // application containing ask
+	queue           *Queue              // queue to preempt for
+	queuePath       string              // path of queue to preempt for
+	headRoom        *resources.Resource // current queue headroom
+	preemptionDelay time.Duration       // preemption delay
+	ask             *AllocationAsk      // ask to be preempted for
+	iterator        NodeIterator        // iterator to enumerate all nodes
+	nodesTried      bool                // flag indicating that scheduling has already been tried on all nodes
+
+	// lazily-populated work structures
+	allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
+	queueByAlloc       map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationID
+	allocationsByNode  map[string][]*Allocation            // map of allocation by nodeID
+	nodeAvailableMap   map[string]*resources.Resource      // map of available resources by nodeID
+}
+
+// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
+type QueuePreemptionSnapshot struct {
+	Parent             *QueuePreemptionSnapshot // snapshot of parent queue
+	QueuePath          string                   // fully qualified path to queue
+	AllocatedResource  *resources.Resource      // allocated resources
+	PreemptingResource *resources.Resource      // resources currently flagged for preemption
+	MaxResource        *resources.Resource      // maximum resources for this queue
+	GuaranteedResource *resources.Resource      // guaranteed resources for this queue
+	PotentialVictims   []*Allocation            // list of allocations which could be preempted
+}
+
+// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.
+func NewPreemptor(application *Application, headRoom *resources.Resource, preemptionDelay time.Duration, ask *AllocationAsk, iterator NodeIterator, nodesTried bool) *Preemptor {
+	return &Preemptor{
+		application:     application,
+		queue:           application.queue,
+		queuePath:       application.queuePath,
+		headRoom:        headRoom,
+		preemptionDelay: preemptionDelay,
+		ask:             ask,
+		iterator:        iterator,
+		nodesTried:      nodesTried,
+	}
+}
+
+// CheckPreconditions performs simple sanity checks designed to determine if preemption should be attempted
+// for an ask. If checks succeed, updates the ask preemption check time.
+func (p *Preemptor) CheckPreconditions() bool {
+	now := time.Now()
+
+	// skip if ask is not allowed to preempt other tasks
+	if !p.ask.IsAllowPreemptOther() {
+		return false
+	}
+
+	// skip if ask has previously triggered preemption
+	if p.ask.HasTriggeredPreemption() {
+		return false
+	}
+
+	// skip if ask requires a specific node (this should be handled by required node preemption algorithm)
+	if p.ask.GetRequiredNode() != "" {
+		return false
+	}
+
+	// skip if preemption delay has not yet passed
+	if now.Before(p.ask.GetCreateTime().Add(p.preemptionDelay)) {
+		return false
+	}
+
+	// skip if attempt frequency hasn't been reached again
+	if now.Before(p.ask.GetPreemptCheckTime().Add(preemptAttemptFrequency)) {
+		return false
+	}
+
+	// mark this ask as having been checked recently to avoid doing extra work in the next scheduling cycle
+	p.ask.UpdatePreemptCheckTime()
+
+	return true
+}
+
+// initQueueSnapshots ensures that snapshots have been taken of the queue
+func (p *Preemptor) initQueueSnapshots() {
+	if p.allocationsByQueue != nil {
+		return
+	}
+
+	p.allocationsByQueue = p.queue.FindEligiblePreemptionVictims(p.queuePath, p.ask)
+}
+
+// initWorkingState builds helper data structures required to compute a solution
+func (p *Preemptor) initWorkingState() {
+	// return if we have already run
+	if p.nodeAvailableMap != nil {
+		return
+	}
+
+	// ensure queue snapshots are populated
+	p.initQueueSnapshots()
+
+	allocationsByNode := make(map[string][]*Allocation)
+	queueByAlloc := make(map[string]*QueuePreemptionSnapshot)
+	nodeAvailableMap := make(map[string]*resources.Resource)
+
+	// build a map from NodeID to allocation and from allocationID to queue capacities
+	for _, victims := range p.allocationsByQueue {
+		for _, allocation := range victims.PotentialVictims {
+			nodeID := allocation.GetNodeID()
+			allocations, ok := allocationsByNode[nodeID]
+			if !ok {
+				allocations = make([]*Allocation, 0)
+			}
+			allocationsByNode[nodeID] = append(allocations, allocation)
+			queueByAlloc[allocation.GetAllocationKey()] = victims
+		}
+	}
+
+	// walk node iterator and track available resources per node
+	for p.iterator.HasNext() {
+		node := p.iterator.Next()
+		if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForApp(reservationKey(nil, p.application, p.ask))) {
+			// node is not available, remove any potential victims from consideration
+			delete(allocationsByNode, node.NodeID)
+		} else {
+			// track allocated and available resources
+			nodeAvailableMap[node.NodeID] = node.GetAvailableResource()
+		}
+	}
+
+	// sort the allocations on each node in the order we'd like to try them
+	sortVictimsForPreemption(allocationsByNode)
+
+	p.allocationsByNode = allocationsByNode
+	p.queueByAlloc = queueByAlloc
+	p.nodeAvailableMap = nodeAvailableMap
+}
+
+// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
+func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
+	p.initQueueSnapshots()
+
+	queues := p.duplicateQueueSnapshots()
+	currentQueue, ok := queues[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Didn't find current queue in snapshot list",
+			zap.String("queuePath", p.queuePath))
+		return false
+	}
+
+	currentQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
+	for _, snapshot := range queues {
+		for _, alloc := range snapshot.PotentialVictims {
+			snapshot.RemoveAllocation(alloc.GetAllocatedResource())
+			if currentQueue.IsWithinGuaranteedResource() {
+				return true
+			}
+		}
+	}
+
+	return false
+}
+
+// calculateVictimsByNode takes a list of potential victims for a node and builds a list ready for the RM to process.
+// Result is a list of allocations and the starting index to check for the initial preemption list.
+// If the result is nil, the node should not be considered for preemption.
+func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
+	nodeCurrentAvailable := nodeAvailable.Clone()
+	allocationsByQueueSnap := p.duplicateQueueSnapshots()
+
+	// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
+	// to queue limits and not node resource limits.
+	if resources.FitIn(nodeCurrentAvailable, p.ask.GetAllocatedResource()) {
+		// return empty list so this node is considered for preemption
+		return -1, make([]*Allocation, 0)
+	}
+
+	// get the current queue snapshot
+	askQueue, ok := allocationsByQueueSnap[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
+	// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
+	// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
+	// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
+	// capacity and the queue guaranteed headroom.
+	head := make([]*Allocation, 0)
+	tail := make([]*Allocation, 0)
+	for _, victim := range potentialVictims {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				// did removing this allocation still keep the queue over-allocated?
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// check to see if the shortfall on the node has changed
+					shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
+					newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
+					newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
+					if resources.EqualsOrEmpty(shortfall, newShortfall) {
+						// shortfall did not change, so task should only be considered as a last resort
+						queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+						tail = append(tail, victim)
+					} else {
+						// shortfall was decreased, so we should keep this task on the main list and adjust usage
+						nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+						head = append(head, victim)
+					}
+				} else {
+					// removing this allocation would have reduced queue below guaranteed limits, put it back
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+	// merge lists
+	head = append(head, tail...)
+	if len(head) == 0 {
+		return -1, nil
+	}
+
+	// clone again
+	nodeCurrentAvailable = nodeAvailable.Clone()
+	allocationsByQueueSnap = p.duplicateQueueSnapshots()
+
+	// get the current queue snapshot
+	askQueue, ok2 := allocationsByQueueSnap[p.queuePath]
+	if !ok2 {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
+	// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
+	// both the node available capacity and the queue headroom. Save the Index within the results of the first task
+	// which would reduce the shortfall to zero.
+	results := make([]*Allocation, 0)
+	index := -1
+	for _, victim := range head {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// removing task does not violate queue constraints, adjust queue and node
+					nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+					// check if ask now fits and we haven't had this happen before
+					if nodeCurrentAvailable.FitInMaxUndef(p.ask.GetAllocatedResource()) && index < 0 {
+						index = len(results)
+					}
+					// add victim to results
+					results = append(results, victim)
+				} else {
+					// add back resources
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+
+	// check to see if enough resources were freed
+	if index < 0 {
+		return -1, nil
+	}
+
+	return index, results
+}
+
+func (p *Preemptor) duplicateQueueSnapshots() map[string]*QueuePreemptionSnapshot {
+	cache := make(map[string]*QueuePreemptionSnapshot, 0)
+	for _, snapshot := range p.allocationsByQueue {
+		snapshot.Duplicate(cache)
+	}
+	return cache
+}
+
+// checkPreemptionPredicates calls the shim via the SI to evaluate nodes for preemption
+func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
+	// don't process empty list
+	if len(predicateChecks) == 0 {
+		return nil
+	}
+
+	// sort predicate checks by number of expected preempted tasks
+	sort.SliceStable(predicateChecks, func(i int, j int) bool {
+		return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
+	})
+
+	// check for RM callback
+	plugin := plugins.GetResourceManagerCallbackPlugin()
+	if plugin == nil {
+		// if a plugin isn't registered, assume checks will succeed and synthesize a result
+		check := predicateChecks[0]
+		log.Logger().Debug("No RM callback plugin registered, using first selected node for preemption",
+			zap.String("NodeID", check.NodeID),
+			zap.String("AllocationKey", check.AllocationKey))
+
+		result := &predicateCheckResult{
+			allocationKey: check.AllocationKey,
+			nodeID:        check.NodeID,
+			success:       true,
+			index:         int(check.StartIndex),
+		}
+		result.populateVictims(victimsByNode)
+		return result
+	}
+
+	// process each batch of checks by sending to the RM
+	batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
+	var bestResult *predicateCheckResult = nil
+	for _, batch := range batches {
+		var wg sync.WaitGroup
+		ch := make(chan *predicateCheckResult, len(batch))
+		expected := 0
+		for _, args := range batch {
+			// add goroutine for checking preemption
+			wg.Add(1)
+			expected++

Review Comment:
   Are we using this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147384561


##########
pkg/scheduler/objects/application.go:
##########
@@ -961,11 +972,22 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator fu
 
 		iterator := nodeIterator()
 		if iterator != nil {
-			alloc := sa.tryNodes(request, iterator)
-			// have a candidate return it
-			if alloc != nil {
+			if alloc := sa.tryNodes(request, iterator); alloc != nil {
+				// have a candidate return it
 				return alloc
 			}
+
+			// no nodes qualify, attempt preemption
+			if *preemptAttemptsRemaining > 0 {
+				*preemptAttemptsRemaining--
+				fullIterator := fullNodeIterator()
+				if fullIterator != nil {

Review Comment:
   When is `fullIterator == nil`? Is this a legit case? I don't see coverage in the unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147676530


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35
+)
+
+// Preemptor encapsulates the functionality required for preemption victim selection
+type Preemptor struct {
+	application     *Application        // application containing ask
+	queue           *Queue              // queue to preempt for
+	queuePath       string              // path of queue to preempt for
+	headRoom        *resources.Resource // current queue headroom
+	preemptionDelay time.Duration       // preemption delay
+	ask             *AllocationAsk      // ask to be preempted for
+	iterator        NodeIterator        // iterator to enumerate all nodes
+	nodesTried      bool                // flag indicating that scheduling has already been tried on all nodes
+
+	// lazily-populated work structures
+	allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
+	queueByAlloc       map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationID
+	allocationsByNode  map[string][]*Allocation            // map of allocation by nodeID
+	nodeAvailableMap   map[string]*resources.Resource      // map of available resources by nodeID
+}
+
+// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
+type QueuePreemptionSnapshot struct {
+	Parent             *QueuePreemptionSnapshot // snapshot of parent queue
+	QueuePath          string                   // fully qualified path to queue
+	AllocatedResource  *resources.Resource      // allocated resources
+	PreemptingResource *resources.Resource      // resources currently flagged for preemption
+	MaxResource        *resources.Resource      // maximum resources for this queue
+	GuaranteedResource *resources.Resource      // guaranteed resources for this queue
+	PotentialVictims   []*Allocation            // list of allocations which could be preempted
+}
+
+// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.

Review Comment:
   I'm hesitant to do this. In fact, there was a point during the development cycle where I considered folding the required node preemptor back into this one (making it the singular preemptor). I'm not generally a fan of longer names when shorter ones will do. The required node preemptor is of course the odd one out now, so the longer name makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147675278


##########
pkg/scheduler/objects/application.go:
##########
@@ -903,8 +901,21 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator fu
 		if sa.canReplace(request) {
 			continue
 		}
-		// resource must fit in headroom otherwise skip the request
+
+		// resource must fit in headroom otherwise skip the request (unless preemption could help)
 		if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
+			// attempt preemption
+			if *preemptAttemptsRemaining > 0 {
+				*preemptAttemptsRemaining--
+				fullIterator := fullNodeIterator()
+				if fullIterator != nil {

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147693197


##########
pkg/scheduler/partition_test.go:
##########
@@ -123,7 +123,6 @@ func TestNewWithPlacement(t *testing.T) {
 			},

Review Comment:
   I'll look into this, but the further up the chain we go, the more difficult the setup gets as more of the environment needs to be mocked. This gets even more difficult in the case of preemption due to the predicate checks. It may not be a worthwhile effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147567537


##########
pkg/scheduler/objects/application.go:
##########
@@ -903,8 +901,21 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator fu
 		if sa.canReplace(request) {
 			continue
 		}
-		// resource must fit in headroom otherwise skip the request
+
+		// resource must fit in headroom otherwise skip the request (unless preemption could help)
 		if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
+			// attempt preemption
+			if *preemptAttemptsRemaining > 0 {
+				*preemptAttemptsRemaining--
+				fullIterator := fullNodeIterator()
+				if fullIterator != nil {
+					if alloc, ok := sa.tryPreemption(headRoom, preemptionDelay, request, fullIterator, false); ok {

Review Comment:
   Is there a way to actually disable preemption completely? Right now this is a new, experimental feature and someone might decide not to use it at all. I was looking at everywhere but I can't find any feature toggle which I think it's important to have, even if it becomes mature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147688681


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35
+)
+
+// Preemptor encapsulates the functionality required for preemption victim selection
+type Preemptor struct {
+	application     *Application        // application containing ask
+	queue           *Queue              // queue to preempt for
+	queuePath       string              // path of queue to preempt for
+	headRoom        *resources.Resource // current queue headroom
+	preemptionDelay time.Duration       // preemption delay
+	ask             *AllocationAsk      // ask to be preempted for
+	iterator        NodeIterator        // iterator to enumerate all nodes
+	nodesTried      bool                // flag indicating that scheduling has already been tried on all nodes
+
+	// lazily-populated work structures
+	allocationsByQueue map[string]*QueuePreemptionSnapshot // map of queue snapshots by queue path
+	queueByAlloc       map[string]*QueuePreemptionSnapshot // map of queue snapshots by allocationID
+	allocationsByNode  map[string][]*Allocation            // map of allocation by nodeID
+	nodeAvailableMap   map[string]*resources.Resource      // map of available resources by nodeID
+}
+
+// QueuePreemptionSnapshot is used to track a snapshot of a queue for preemption
+type QueuePreemptionSnapshot struct {
+	Parent             *QueuePreemptionSnapshot // snapshot of parent queue
+	QueuePath          string                   // fully qualified path to queue
+	AllocatedResource  *resources.Resource      // allocated resources
+	PreemptingResource *resources.Resource      // resources currently flagged for preemption
+	MaxResource        *resources.Resource      // maximum resources for this queue
+	GuaranteedResource *resources.Resource      // guaranteed resources for this queue
+	PotentialVictims   []*Allocation            // list of allocations which could be preempted
+}
+
+// NewPreemptor creates a new preemptor. The preemptor itself is not thread safe, and assumes the application lock is held.
+func NewPreemptor(application *Application, headRoom *resources.Resource, preemptionDelay time.Duration, ask *AllocationAsk, iterator NodeIterator, nodesTried bool) *Preemptor {
+	return &Preemptor{
+		application:     application,
+		queue:           application.queue,
+		queuePath:       application.queuePath,
+		headRoom:        headRoom,
+		preemptionDelay: preemptionDelay,
+		ask:             ask,
+		iterator:        iterator,
+		nodesTried:      nodesTried,
+	}
+}
+
+// CheckPreconditions performs simple sanity checks designed to determine if preemption should be attempted
+// for an ask. If checks succeed, updates the ask preemption check time.
+func (p *Preemptor) CheckPreconditions() bool {
+	now := time.Now()
+
+	// skip if ask is not allowed to preempt other tasks
+	if !p.ask.IsAllowPreemptOther() {
+		return false
+	}
+
+	// skip if ask has previously triggered preemption
+	if p.ask.HasTriggeredPreemption() {
+		return false
+	}
+
+	// skip if ask requires a specific node (this should be handled by required node preemption algorithm)
+	if p.ask.GetRequiredNode() != "" {
+		return false
+	}
+
+	// skip if preemption delay has not yet passed
+	if now.Before(p.ask.GetCreateTime().Add(p.preemptionDelay)) {
+		return false
+	}
+
+	// skip if attempt frequency hasn't been reached again
+	if now.Before(p.ask.GetPreemptCheckTime().Add(preemptAttemptFrequency)) {
+		return false
+	}
+
+	// mark this ask as having been checked recently to avoid doing extra work in the next scheduling cycle
+	p.ask.UpdatePreemptCheckTime()
+
+	return true
+}
+
+// initQueueSnapshots ensures that snapshots have been taken of the queue
+func (p *Preemptor) initQueueSnapshots() {
+	if p.allocationsByQueue != nil {
+		return
+	}
+
+	p.allocationsByQueue = p.queue.FindEligiblePreemptionVictims(p.queuePath, p.ask)
+}
+
+// initWorkingState builds helper data structures required to compute a solution
+func (p *Preemptor) initWorkingState() {
+	// return if we have already run
+	if p.nodeAvailableMap != nil {
+		return
+	}
+
+	// ensure queue snapshots are populated
+	p.initQueueSnapshots()
+
+	allocationsByNode := make(map[string][]*Allocation)
+	queueByAlloc := make(map[string]*QueuePreemptionSnapshot)
+	nodeAvailableMap := make(map[string]*resources.Resource)
+
+	// build a map from NodeID to allocation and from allocationID to queue capacities
+	for _, victims := range p.allocationsByQueue {
+		for _, allocation := range victims.PotentialVictims {
+			nodeID := allocation.GetNodeID()
+			allocations, ok := allocationsByNode[nodeID]
+			if !ok {
+				allocations = make([]*Allocation, 0)
+			}
+			allocationsByNode[nodeID] = append(allocations, allocation)
+			queueByAlloc[allocation.GetAllocationKey()] = victims
+		}
+	}
+
+	// walk node iterator and track available resources per node
+	for p.iterator.HasNext() {
+		node := p.iterator.Next()
+		if !node.IsSchedulable() || (node.IsReserved() && !node.isReservedForApp(reservationKey(nil, p.application, p.ask))) {
+			// node is not available, remove any potential victims from consideration
+			delete(allocationsByNode, node.NodeID)
+		} else {
+			// track allocated and available resources
+			nodeAvailableMap[node.NodeID] = node.GetAvailableResource()
+		}
+	}
+
+	// sort the allocations on each node in the order we'd like to try them
+	sortVictimsForPreemption(allocationsByNode)
+
+	p.allocationsByNode = allocationsByNode
+	p.queueByAlloc = queueByAlloc
+	p.nodeAvailableMap = nodeAvailableMap
+}
+
+// checkPreemptionQueueGuarantees verifies that it's possible to free enough resources to fit the given ask
+func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
+	p.initQueueSnapshots()
+
+	queues := p.duplicateQueueSnapshots()
+	currentQueue, ok := queues[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Didn't find current queue in snapshot list",
+			zap.String("queuePath", p.queuePath))
+		return false
+	}
+
+	currentQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
+	for _, snapshot := range queues {
+		for _, alloc := range snapshot.PotentialVictims {
+			snapshot.RemoveAllocation(alloc.GetAllocatedResource())
+			if currentQueue.IsWithinGuaranteedResource() {
+				return true
+			}
+		}
+	}
+
+	return false
+}
+
+// calculateVictimsByNode takes a list of potential victims for a node and builds a list ready for the RM to process.
+// Result is a list of allocations and the starting index to check for the initial preemption list.
+// If the result is nil, the node should not be considered for preemption.
+func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, potentialVictims []*Allocation) (int, []*Allocation) {
+	nodeCurrentAvailable := nodeAvailable.Clone()
+	allocationsByQueueSnap := p.duplicateQueueSnapshots()
+
+	// Initial check: Will allocation fit on node without preemption? This is possible if preemption was triggered due
+	// to queue limits and not node resource limits.
+	if resources.FitIn(nodeCurrentAvailable, p.ask.GetAllocatedResource()) {
+		// return empty list so this node is considered for preemption
+		return -1, make([]*Allocation, 0)
+	}
+
+	// get the current queue snapshot
+	askQueue, ok := allocationsByQueueSnap[p.queuePath]
+	if !ok {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// First pass: Check each task to see whether we are able to reduce our shortfall by preempting each
+	// task in turn, and filter out tasks which will cause their queue to drop below guaranteed capacity.
+	// If a task could be preempted without violating queue constraints, add it to either the 'head' list or the
+	// 'tail' list depending on whether the shortfall is reduced. If added to the 'head' list, adjust the node available
+	// capacity and the queue guaranteed headroom.
+	head := make([]*Allocation, 0)
+	tail := make([]*Allocation, 0)
+	for _, victim := range potentialVictims {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				// did removing this allocation still keep the queue over-allocated?
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// check to see if the shortfall on the node has changed
+					shortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), nodeCurrentAvailable)
+					newAvailable := resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
+					newShortfall := resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
+					if resources.EqualsOrEmpty(shortfall, newShortfall) {
+						// shortfall did not change, so task should only be considered as a last resort
+						queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+						tail = append(tail, victim)
+					} else {
+						// shortfall was decreased, so we should keep this task on the main list and adjust usage
+						nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+						head = append(head, victim)
+					}
+				} else {
+					// removing this allocation would have reduced queue below guaranteed limits, put it back
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+	// merge lists
+	head = append(head, tail...)
+	if len(head) == 0 {
+		return -1, nil
+	}
+
+	// clone again
+	nodeCurrentAvailable = nodeAvailable.Clone()
+	allocationsByQueueSnap = p.duplicateQueueSnapshots()
+
+	// get the current queue snapshot
+	askQueue, ok2 := allocationsByQueueSnap[p.queuePath]
+	if !ok2 {
+		log.Logger().Warn("BUG: Queue not found by name", zap.String("queuePath", p.queuePath))
+		return -1, nil
+	}
+
+	// speculatively add the current ask
+	askQueue.AddAllocation(p.ask.GetAllocatedResource())
+
+	// Second pass: The task ordering can no longer change. For each task, check that queue constraints would not be
+	// violated if the task were to be preempted. If so, discard the task. If the task can be preempted, adjust
+	// both the node available capacity and the queue headroom. Save the Index within the results of the first task
+	// which would reduce the shortfall to zero.
+	results := make([]*Allocation, 0)
+	index := -1
+	for _, victim := range head {
+		// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
+		if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
+			if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
+				queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
+				if queueSnapshot.IsAtOrAboveGuaranteedResource() {
+					// removing task does not violate queue constraints, adjust queue and node
+					nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
+					// check if ask now fits and we haven't had this happen before
+					if nodeCurrentAvailable.FitInMaxUndef(p.ask.GetAllocatedResource()) && index < 0 {
+						index = len(results)
+					}
+					// add victim to results
+					results = append(results, victim)
+				} else {
+					// add back resources
+					queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+				}
+			}
+		}
+	}
+
+	// check to see if enough resources were freed
+	if index < 0 {
+		return -1, nil
+	}
+
+	return index, results
+}
+
+func (p *Preemptor) duplicateQueueSnapshots() map[string]*QueuePreemptionSnapshot {
+	cache := make(map[string]*QueuePreemptionSnapshot, 0)
+	for _, snapshot := range p.allocationsByQueue {
+		snapshot.Duplicate(cache)
+	}
+	return cache
+}
+
+// checkPreemptionPredicates calls the shim via the SI to evaluate nodes for preemption
+func (p *Preemptor) checkPreemptionPredicates(predicateChecks []*si.PreemptionPredicatesArgs, victimsByNode map[string][]*Allocation) *predicateCheckResult {
+	// don't process empty list
+	if len(predicateChecks) == 0 {
+		return nil
+	}
+
+	// sort predicate checks by number of expected preempted tasks
+	sort.SliceStable(predicateChecks, func(i int, j int) bool {
+		return predicateChecks[i].StartIndex < predicateChecks[j].StartIndex
+	})
+
+	// check for RM callback
+	plugin := plugins.GetResourceManagerCallbackPlugin()
+	if plugin == nil {
+		// if a plugin isn't registered, assume checks will succeed and synthesize a result
+		check := predicateChecks[0]
+		log.Logger().Debug("No RM callback plugin registered, using first selected node for preemption",
+			zap.String("NodeID", check.NodeID),
+			zap.String("AllocationKey", check.AllocationKey))
+
+		result := &predicateCheckResult{
+			allocationKey: check.AllocationKey,
+			nodeID:        check.NodeID,
+			success:       true,
+			index:         int(check.StartIndex),
+		}
+		result.populateVictims(victimsByNode)
+		return result
+	}
+
+	// process each batch of checks by sending to the RM
+	batches := batchPreemptionChecks(predicateChecks, preemptCheckConcurrency)
+	var bestResult *predicateCheckResult = nil
+	for _, batch := range batches {
+		var wg sync.WaitGroup
+		ch := make(chan *predicateCheckResult, len(batch))
+		expected := 0
+		for _, args := range batch {
+			// add goroutine for checking preemption
+			wg.Add(1)
+			expected++
+			go preemptPredicateCheck(plugin, ch, &wg, args)
+		}
+		// wait for completion and close channel
+		go func() {
+			wg.Wait()
+			close(ch)
+		}()
+		for result := range ch {
+			// if result is successful, keep track of it
+			if result.success {
+				if bestResult == nil {
+					bestResult = result
+				} else if result.betterThan(bestResult, p.allocationsByNode) {
+					bestResult = result

Review Comment:
   This is used to compare two potential "solutions" (i.e. victims per node). The existing tests for simplicity usually use a single node. I can add some specific tests around the sorting logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] craigcondit commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "craigcondit (via GitHub)" <gi...@apache.org>.
craigcondit commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147693751


##########
pkg/scheduler/objects/queue.go:
##########
@@ -41,29 +40,32 @@ import (
 	"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
 
+var (
+	maxPreemptionsPerQueue = 10 // maximum number of asks to attempt to preempt for in a single queue
+)
+
 // Queue structure inside Scheduler
 type Queue struct {
 	QueuePath string // Fully qualified path for the queue
 	Name      string // Queue name as in the config etc.
 
 	// Private fields need protection
-	sortType                  policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
-	children                  map[string]*Queue         // Only for direct children, parent queue only
-	childPriorities           map[string]int32          // cached priorities for child queues
-	childPreemptionPriorities map[string]int32          // cached preemption priorities for child queues
-	applications              map[string]*Application   // only for leaf queue
-	appPriorities             map[string]int32          // cached priorities for application
-	appPreemptionPriorities   map[string]int32          // cached preemption priorities for application
-	reservedApps              map[string]int            // applications reserved within this queue, with reservation count
-	parent                    *Queue                    // link back to the parent in the scheduler
-	pending                   *resources.Resource       // pending resource for the apps in the queue
-	prioritySortEnabled       bool                      // whether priority is used for request sorting
-	priorityPolicy            policies.PriorityPolicy   // priority policy
-	priorityOffset            int32                     // priority offset for this queue relative to others
-	preemptionPolicy          policies.PreemptionPolicy // preemption policy
-	preemptionDelay           time.Duration             // time before preemption is considered
-	currentPriority           int32                     // the current scheduling priority of this queue
-	preemptionPriority        int32                     // the current preemption priority of this queue
+	sortType            policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
+	children            map[string]*Queue         // Only for direct children, parent queue only
+	childPriorities     map[string]int32          // cached priorities for child queues
+	applications        map[string]*Application   // only for leaf queue
+	appPriorities       map[string]int32          // cached priorities for application
+	reservedApps        map[string]int            // applications reserved within this queue, with reservation count
+	parent              *Queue                    // link back to the parent in the scheduler
+	pending             *resources.Resource       // pending resource for the apps in the queue
+	allocatedResource   *resources.Resource       // allocated resource for the apps in the queue
+	preemptingResource  *resources.Resource       // preempting resource for the apps in the queue

Review Comment:
   I'll see what I can do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] codecov[bot] commented on pull request #511: [YUNIKORN-1467] [WIP] Implement priority-based victim selection algorithm

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#issuecomment-1444097281

   # [Codecov](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#511](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08c2dac) into [master](https://codecov.io/gh/apache/yunikorn-core/commit/87aff78ca890a4b3feabc3d5b3f91a9862ff1024?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (87aff78) will **decrease** coverage by `3.50%`.
   > The diff coverage is `13.21%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #511      +/-   ##
   ==========================================
   - Coverage   73.42%   69.93%   -3.50%     
   ==========================================
     Files          69       70       +1     
     Lines       10449    10859     +410     
   ==========================================
   - Hits         7672     7594      -78     
   - Misses       2529     3021     +492     
   + Partials      248      244       -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [pkg/common/configs/config.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL2NvbW1vbi9jb25maWdzL2NvbmZpZy5nbw==) | `100.00% <ø> (ø)` | |
   | [pkg/scheduler/objects/application.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3NjaGVkdWxlci9vYmplY3RzL2FwcGxpY2F0aW9uLmdv) | `54.62% <0.00%> (-3.84%)` | :arrow_down: |
   | [pkg/scheduler/objects/preemption.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3NjaGVkdWxlci9vYmplY3RzL3ByZWVtcHRpb24uZ28=) | `0.00% <0.00%> (ø)` | |
   | [pkg/scheduler/partition.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3NjaGVkdWxlci9wYXJ0aXRpb24uZ28=) | `77.16% <ø> (-0.07%)` | :arrow_down: |
   | [pkg/scheduler/tests/mock\_rm\_callback.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3NjaGVkdWxlci90ZXN0cy9tb2NrX3JtX2NhbGxiYWNrLmdv) | `20.00% <0.00%> (-8.58%)` | :arrow_down: |
   | [pkg/scheduler/objects/queue.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3NjaGVkdWxlci9vYmplY3RzL3F1ZXVlLmdv) | `65.48% <18.42%> (-11.37%)` | :arrow_down: |
   | [pkg/common/utils.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL2NvbW1vbi91dGlscy5nbw==) | `86.04% <100.00%> (-0.63%)` | :arrow_down: |
   | [pkg/scheduler/objects/allocation\_ask.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3NjaGVkdWxlci9vYmplY3RzL2FsbG9jYXRpb25fYXNrLmdv) | `87.68% <100.00%> (+1.17%)` | :arrow_up: |
   | [pkg/scheduler/objects/required\_node\_preemptor.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3NjaGVkdWxlci9vYmplY3RzL3JlcXVpcmVkX25vZGVfcHJlZW1wdG9yLmdv) | `98.86% <100.00%> (ø)` | |
   | [pkg/webservice/handlers.go](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL3dlYnNlcnZpY2UvaGFuZGxlcnMuZ28=) | `77.16% <100.00%> (-0.04%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/yunikorn-core/pull/511?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147385690


##########
pkg/scheduler/objects/application.go:
##########
@@ -903,8 +901,21 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator fu
 		if sa.canReplace(request) {
 			continue
 		}
-		// resource must fit in headroom otherwise skip the request
+
+		// resource must fit in headroom otherwise skip the request (unless preemption could help)
 		if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
+			// attempt preemption
+			if *preemptAttemptsRemaining > 0 {
+				*preemptAttemptsRemaining--
+				fullIterator := fullNodeIterator()
+				if fullIterator != nil {

Review Comment:
   When is `fullIterator == nil`? Is this a legit case? I don't see coverage in the unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147406728


##########
pkg/scheduler/objects/preemption.go:
##########
@@ -0,0 +1,875 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/common/resources"
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-core/pkg/plugins"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	preemptAttemptFrequency        = 15 * time.Second
+	preemptCheckConcurrency        = 10
+	scoreFitMax             uint64 = 1 << 32
+	scoreOriginator         uint64 = 1 << 33
+	scoreNoPreempt          uint64 = 1 << 34
+	scoreUnfit              uint64 = 1 << 35

Review Comment:
   Two questions:
   1. Shouldn't these be in a const section?
   2. What's the reason for them starting at 1 << 32?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #511: [YUNIKORN-1467] Implement core preemption logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #511:
URL: https://github.com/apache/yunikorn-core/pull/511#discussion_r1147595388


##########
pkg/scheduler/objects/queue.go:
##########
@@ -41,29 +40,32 @@ import (
 	"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
 
+var (
+	maxPreemptionsPerQueue = 10 // maximum number of asks to attempt to preempt for in a single queue
+)
+
 // Queue structure inside Scheduler
 type Queue struct {
 	QueuePath string // Fully qualified path for the queue
 	Name      string // Queue name as in the config etc.
 
 	// Private fields need protection
-	sortType                  policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
-	children                  map[string]*Queue         // Only for direct children, parent queue only
-	childPriorities           map[string]int32          // cached priorities for child queues
-	childPreemptionPriorities map[string]int32          // cached preemption priorities for child queues
-	applications              map[string]*Application   // only for leaf queue
-	appPriorities             map[string]int32          // cached priorities for application
-	appPreemptionPriorities   map[string]int32          // cached preemption priorities for application
-	reservedApps              map[string]int            // applications reserved within this queue, with reservation count
-	parent                    *Queue                    // link back to the parent in the scheduler
-	pending                   *resources.Resource       // pending resource for the apps in the queue
-	prioritySortEnabled       bool                      // whether priority is used for request sorting
-	priorityPolicy            policies.PriorityPolicy   // priority policy
-	priorityOffset            int32                     // priority offset for this queue relative to others
-	preemptionPolicy          policies.PreemptionPolicy // preemption policy
-	preemptionDelay           time.Duration             // time before preemption is considered
-	currentPriority           int32                     // the current scheduling priority of this queue
-	preemptionPriority        int32                     // the current preemption priority of this queue
+	sortType            policies.SortPolicy       // How applications (leaf) or queues (parents) are sorted
+	children            map[string]*Queue         // Only for direct children, parent queue only
+	childPriorities     map[string]int32          // cached priorities for child queues
+	applications        map[string]*Application   // only for leaf queue
+	appPriorities       map[string]int32          // cached priorities for application
+	reservedApps        map[string]int            // applications reserved within this queue, with reservation count
+	parent              *Queue                    // link back to the parent in the scheduler
+	pending             *resources.Resource       // pending resource for the apps in the queue
+	allocatedResource   *resources.Resource       // allocated resource for the apps in the queue
+	preemptingResource  *resources.Resource       // preempting resource for the apps in the queue

Review Comment:
   The thing I'd really like to see tested (possibly from a future test case in `partition_test.go`) is this resource tracking. So, after a completed preemption, this should got back to zero. Not sure if we have something similar which checks `allocatedResource`, but for this, I think it's a great to have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org