You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2020/11/18 02:21:50 UTC

[GitHub] [incubator-yunikorn-core] yangwwei commented on a change in pull request #221: [YUNIKORN-317] Cache removal and refactoring

yangwwei commented on a change in pull request #221:
URL: https://github.com/apache/incubator-yunikorn-core/pull/221#discussion_r525573602



##########
File path: pkg/interfaces/interfaces.go
##########
@@ -16,46 +16,15 @@
  limitations under the License.
 */
 
-package common
-
-import (
-	"context"
-	"sync"
-)
-
-type DoWorkPieceFunc func(piece int)
-
-// Copy from k8s.
-func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
-	var stop <-chan struct{}
-	if ctx != nil {
-		stop = ctx.Done()
-	}
-
-	toProcess := make(chan int, pieces)
-	for i := 0; i < pieces; i++ {
-		toProcess <- i
-	}
-	close(toProcess)
-
-	if pieces < workers {
-		workers = pieces
-	}
-
-	wg := sync.WaitGroup{}
-	wg.Add(workers)
-	for i := 0; i < workers; i++ {
-		go func() {
-			defer wg.Done()
-			for piece := range toProcess {
-				select {
-				case <-stop:
-					return
-				default:
-					doWorkPiece(piece)
-				}
-			}
-		}()
-	}
-	wg.Wait()
+package interfaces

Review comment:
       can we rename this file to `node_iterator.go`?
   this assumes we will add more interfaces files to this package

##########
File path: pkg/scheduler/objects/application.go
##########
@@ -0,0 +1,926 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"fmt"
+	"math"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/looplab/fsm"
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/security"
+	"github.com/apache/incubator-yunikorn-core/pkg/events"
+	"github.com/apache/incubator-yunikorn-core/pkg/handler"
+	"github.com/apache/incubator-yunikorn-core/pkg/interfaces"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	reservationDelay = 2 * time.Second
+	startingTimeout  = time.Minute * 5
+)
+
+type Application struct {
+	ApplicationID  string
+	Partition      string
+	QueueName      string
+	SubmissionTime time.Time
+
+	// Private fields need protection
+	queue             *Queue                    // queue the application is running in
+	pending           *resources.Resource       // pending resources from asks for the app
+	reservations      map[string]*reservation   // a map of reservations
+	requests          map[string]*AllocationAsk // a map of asks
+	sortedRequests    []*AllocationAsk
+	user              security.UserGroup     // owner of the application
+	tags              map[string]string      // application tags used in scheduling
+	allocatedResource *resources.Resource    // total allocated resources
+	allocations       map[string]*Allocation // list of all allocations
+	stateMachine      *fsm.FSM               // application state machine
+	stateTimer        *time.Timer            // timer for state time
+
+	rmEventHandler handler.EventHandler
+	rmID           string
+
+	sync.RWMutex
+}
+
+func newBlankApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string) *Application {
+	return &Application{
+		ApplicationID:     appID,
+		Partition:         partition,
+		QueueName:         queueName,
+		SubmissionTime:    time.Now(),
+		user:              ugi,
+		tags:              tags,
+		pending:           resources.NewResource(),
+		allocatedResource: resources.NewResource(),
+		requests:          make(map[string]*AllocationAsk),
+		reservations:      make(map[string]*reservation),
+		allocations:       make(map[string]*Allocation),
+		stateMachine:      NewAppState(),
+	}
+}
+
+func NewApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string, eventHandler handler.EventHandler, rmID string) *Application {
+	app := newBlankApplication(appID, partition, queueName, ugi, tags)
+	app.rmEventHandler = eventHandler
+	app.rmID = rmID
+	return app
+}
+
+func (sa *Application) String() string {
+	if sa == nil {
+		return "application is nil"
+	}
+	return fmt.Sprintf("ApplicationID: %s, Partition: %s, QueueName: %s, SubmissionTime: %x",
+		sa.ApplicationID, sa.Partition, sa.QueueName, sa.SubmissionTime)
+}
+
+// Set the reservation delay.
+// Set when the cluster context is created to disable reservation.
+func SetReservationDelay(delay time.Duration) {
+	log.Logger().Debug("Set reservation delay",
+		zap.Duration("delay", delay))
+	reservationDelay = delay
+}
+
+// Return the current state or a checked specific state for the application.
+// The state machine handles the locking.
+func (sa *Application) CurrentState() string {
+	return sa.stateMachine.Current()
+}
+
+func (sa *Application) IsStarting() bool {
+	return sa.stateMachine.Is(Starting.String())
+}
+
+func (sa *Application) IsAccepted() bool {
+	return sa.stateMachine.Is(Accepted.String())
+}
+
+func (sa *Application) IsNew() bool {
+	return sa.stateMachine.Is(New.String())
+}
+
+func (sa *Application) IsRunning() bool {
+	return sa.stateMachine.Is(Running.String())
+}
+
+func (sa *Application) IsWaiting() bool {
+	return sa.stateMachine.Is(Waiting.String())
+}
+
+// Handle the state event for the application.
+// The state machine handles the locking.
+func (sa *Application) HandleApplicationEvent(event applicationEvent) error {
+	err := sa.stateMachine.Event(event.String(), sa)
+	// handle the same state transition not nil error (limit of fsm).
+	if err != nil && err.Error() == noTransition {
+		return nil
+	}
+	return err
+}
+
+func (sa *Application) OnStateChange(event *fsm.Event) {
+	updatedApps := make([]*si.UpdatedApplication, 0)
+	updatedApps = append(updatedApps, &si.UpdatedApplication{
+		ApplicationID:            sa.ApplicationID,
+		State:                    sa.stateMachine.Current(),
+		StateTransitionTimestamp: time.Now().UnixNano(),
+		Message:                  fmt.Sprintf("{Status change triggered by the event : %v}", event),
+	})
+
+	if sa.rmEventHandler != nil {
+		sa.rmEventHandler.HandleEvent(
+			&rmevent.RMApplicationUpdateEvent{
+				RmID:                 sa.rmID,
+				AcceptedApplications: make([]*si.AcceptedApplication, 0),
+				RejectedApplications: make([]*si.RejectedApplication, 0),
+				UpdatedApplications:  updatedApps,
+			})
+	}
+}
+
+// Set the starting timer to make sure the application will not get stuck in a starting state too long.
+// This prevents an app from not progressing to Running when it only has 1 allocation.
+// Called when entering the Starting state by the state machine.
+func (sa *Application) SetStartingTimer() {
+	log.Logger().Debug("Application Starting state timer initiated",
+		zap.String("appID", sa.ApplicationID),
+		zap.Duration("timeout", startingTimeout))
+	sa.stateTimer = time.AfterFunc(startingTimeout, sa.timeOutStarting)
+}
+
+// Clear the starting timer. If the application has progressed out of the starting state we need to stop the
+// timer and clean up.
+// Called when leaving the Starting state by the state machine.
+func (sa *Application) ClearStartingTimer() {
+	sa.stateTimer.Stop()
+	sa.stateTimer = nil
+}
+
+// In case of state aware scheduling we do not want to get stuck in starting as we might have an application that only
+// requires one allocation or is really slow asking for more than the first one.
+// This will progress the state of the application from Starting to Running
+func (sa *Application) timeOutStarting() {
+	// make sure we are still in the right state
+	// we could have been killed or something might have happened while waiting for a lock
+	if sa.IsStarting() {
+		log.Logger().Warn("Application in starting state timed out: auto progress",
+			zap.String("applicationID", sa.ApplicationID),
+			zap.String("state", sa.stateMachine.Current()))
+
+		//nolint: errcheck
+		_ = sa.HandleApplicationEvent(runApplication)
+	}
+}
+
+// Return an array of all reservation keys for the app.
+// This will return an empty array if there are no reservations.
+// Visible for tests
+func (sa *Application) GetReservations() []string {
+	sa.RLock()
+	defer sa.RUnlock()
+	keys := make([]string, 0)
+	for key := range sa.reservations {
+		keys = append(keys, key)
+	}
+	return keys
+}
+
+// Return the allocation ask for the key, nil if not found
+func (sa *Application) GetSchedulingAllocationAsk(allocationKey string) *AllocationAsk {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.requests[allocationKey]
+}
+
+// Return the allocated resources for this application
+func (sa *Application) GetAllocatedResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.allocatedResource.Clone()
+}
+
+// Return the pending resources for this application
+func (sa *Application) GetPendingResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.pending
+}
+
+// Remove one or more allocation asks from this application.
+// This also removes any reservations that are linked to the ask.
+// The return value is the number of reservations released
+func (sa *Application) RemoveAllocationAsk(allocKey string) int {
+	sa.Lock()
+	defer sa.Unlock()
+	// shortcut no need to do anything
+	if len(sa.requests) == 0 {
+		return 0
+	}
+	var deltaPendingResource *resources.Resource = nil
+	// when allocation key not specified, cleanup all allocation ask
+	var toRelease int
+	if allocKey == "" {
+		// cleanup all reservations
+		for key, reserve := range sa.reservations {
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing all allocation asks",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation (one at a time)
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		// Cleanup total pending resource
+		deltaPendingResource = sa.pending
+		sa.pending = resources.NewResource()
+		sa.requests = make(map[string]*AllocationAsk)
+	} else {
+		// cleanup the reservation for this allocation
+		for _, key := range sa.IsAskReserved(allocKey) {
+			reserve := sa.reservations[key]
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing allocation ask",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		if ask := sa.requests[allocKey]; ask != nil {
+			deltaPendingResource = resources.MultiplyBy(ask.AllocatedResource, float64(ask.GetPendingAskRepeat()))
+			sa.pending.SubFrom(deltaPendingResource)
+			delete(sa.requests, allocKey)
+		}
+	}
+	// clean up the queue pending resources
+	sa.queue.decPendingResource(deltaPendingResource)
+	// Check if we need to change state based on the ask removal:
+	// 1) if pending is zero (no more asks left)
+	// 2) if confirmed allocations is zero (nothing is running)
+	// Change the state to waiting.
+	// When the resource trackers are zero we should not expect anything to come in later.
+	if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) {
+		if err := sa.HandleApplicationEvent(waitApplication); err != nil {
+			log.Logger().Warn("Application state not changed to Waiting while updating ask(s)",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+
+	return toRelease
+}
+
+// Add an allocation ask to this application
+// If the ask already exist update the existing info
+func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return fmt.Errorf("ask cannot be nil when added to app %s", sa.ApplicationID)
+	}
+	if ask.GetPendingAskRepeat() == 0 || resources.IsZero(ask.AllocatedResource) {
+		return fmt.Errorf("invalid ask added to app %s: %v", sa.ApplicationID, ask)
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	delta := resources.Multiply(ask.AllocatedResource, int64(ask.GetPendingAskRepeat()))
+
+	var oldAskResource *resources.Resource = nil
+	if oldAsk := sa.requests[ask.AllocationKey]; oldAsk != nil {
+		oldAskResource = resources.Multiply(oldAsk.AllocatedResource, int64(oldAsk.GetPendingAskRepeat()))
+	}
+
+	// Check if we need to change state based on the ask added, there are two cases:
+	// 1) first ask added on a new app: state is New
+	// 2) all asks and allocation have been removed: state is Waiting
+	// Move the state and get it scheduling (again)
+	if sa.stateMachine.Is(New.String()) || sa.stateMachine.Is(Waiting.String()) {
+		if err := sa.HandleApplicationEvent(runApplication); err != nil {
+			log.Logger().Debug("Application state change failed while adding new ask",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+	sa.requests[ask.AllocationKey] = ask
+
+	// Update total pending resource
+	delta.SubFrom(oldAskResource)
+	sa.pending.AddTo(delta)
+	sa.queue.incPendingResource(delta)
+
+	return nil
+}
+
+// Add the ask when a node allocation is recovered. Maintaining the rule that an Allocation always has a
+// link to an AllocationAsk.
+// Safeguarded against a nil but the recovery generates the ask and should never be nil.
+func (sa *Application) RecoverAllocationAsk(ask *AllocationAsk) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	sa.requests[ask.AllocationKey] = ask
+}
+
+func (sa *Application) updateAskRepeat(allocKey string, delta int32) (*resources.Resource, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask := sa.requests[allocKey]; ask != nil {
+		return sa.updateAskRepeatInternal(ask, delta)
+	}
+	return nil, fmt.Errorf("failed to locate ask with key %s", allocKey)
+}
+
+func (sa *Application) updateAskRepeatInternal(ask *AllocationAsk, delta int32) (*resources.Resource, error) {
+	// updating with delta does error checking internally
+	if !ask.UpdatePendingAskRepeat(delta) {
+		return nil, fmt.Errorf("ask repaeat not updated resulting repeat less than zero for ask %s on app %s", ask.AllocationKey, sa.ApplicationID)
+	}
+
+	deltaPendingResource := resources.Multiply(ask.AllocatedResource, int64(delta))
+	sa.pending.AddTo(deltaPendingResource)
+	// update the pending of the queue with the same delta
+	sa.queue.incPendingResource(deltaPendingResource)
+
+	return deltaPendingResource, nil
+}
+
+// Return if the application has any reservations.
+func (sa *Application) hasReserved() bool {
+	sa.RLock()
+	defer sa.RUnlock()
+	return len(sa.reservations) > 0
+}
+
+// Return if the application has the node reserved.
+// An empty nodeID is never reserved.
+func (sa *Application) IsReservedOnNode(nodeID string) bool {
+	if nodeID == "" {
+		return false
+	}
+	sa.RLock()
+	defer sa.RUnlock()
+	for key := range sa.reservations {
+		if strings.HasPrefix(key, nodeID) {
+			return true
+		}
+	}
+	return false
+}
+
+// Reserve the application for this node and ask combination.
+// If the reservation fails the function returns false, if the reservation is made it returns true.
+// If the node and ask combination was already reserved for the application this is a noop and returns true.
+func (sa *Application) Reserve(node *Node, ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	// create the reservation (includes nil checks)
+	nodeReservation := newReservation(node, sa, ask, true)
+	if nodeReservation == nil {
+		log.Logger().Debug("reservation creation failed unexpectedly",
+			zap.String("app", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return fmt.Errorf("reservation creation failed node or ask are nil on appID %s", sa.ApplicationID)
+	}
+	allocKey := ask.AllocationKey
+	if sa.requests[allocKey] == nil {
+		log.Logger().Debug("ask is not registered to this app",
+			zap.String("app", sa.ApplicationID),
+			zap.String("allocKey", allocKey))
+		return fmt.Errorf("reservation creation failed ask %s not found on appID %s", allocKey, sa.ApplicationID)
+	}
+	if !sa.canAskReserve(ask) {
+		return fmt.Errorf("reservation of ask exceeds pending repeat, pending ask repeat %d", ask.GetPendingAskRepeat())
+	}
+	// check if we can reserve the node before reserving on the app
+	if err := node.Reserve(sa, ask); err != nil {
+		return err
+	}
+	sa.reservations[nodeReservation.getKey()] = nodeReservation
+	log.Logger().Info("reservation added successfully", zap.String("node", node.NodeID),
+		zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+	return nil
+}
+
+// UnReserve the application for this node and ask combination.
+// This first removes the reservation from the node.
+// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
+// The error is set if the reservation key cannot be removed from the app or node.
+func (sa *Application) UnReserve(node *Node, ask *AllocationAsk) (int, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	return sa.unReserveInternal(node, ask)
+}
+
+// Unlocked version for UnReserve that really does the work.
+// Must only be called while holding the application lock.
+func (sa *Application) unReserveInternal(node *Node, ask *AllocationAsk) (int, error) {
+	resKey := reservationKey(node, nil, ask)
+	if resKey == "" {
+		log.Logger().Debug("unreserve reservation key create failed unexpectedly",
+			zap.String("appID", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return 0, fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationID)
+	}
+	// unReserve the node before removing from the app
+	var num int
+	var err error
+	if num, err = node.unReserve(sa, ask); err != nil {
+		return 0, err
+	}
+	// if the unreserve worked on the node check the app
+	if _, found := sa.reservations[resKey]; found {
+		// worked on the node means either found or not but no error, log difference here
+		if num == 0 {
+			log.Logger().Info("reservation not found while removing from node, app has reservation",
+				zap.String("appID", sa.ApplicationID),
+				zap.String("nodeID", node.NodeID),
+				zap.String("ask", ask.AllocationKey))
+		}
+		delete(sa.reservations, resKey)
+		log.Logger().Info("reservation removed successfully", zap.String("node", node.NodeID),
+			zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+		return 1, nil
+	}
+	// reservation was not found
+	log.Logger().Info("reservation not found while removing from app",
+		zap.String("appID", sa.ApplicationID),
+		zap.String("nodeID", node.NodeID),
+		zap.String("ask", ask.AllocationKey),
+		zap.Int("nodeReservationsRemoved", num))
+	return 0, nil
+}
+
+// Return the allocation reservations on any node.
+// The returned array is 0 or more keys into the reservations map.
+// No locking must be called while holding the lock
+func (sa *Application) IsAskReserved(allocKey string) []string {
+	reservationKeys := make([]string, 0)
+	if allocKey == "" {
+		return reservationKeys
+	}
+	for key := range sa.reservations {
+		if strings.HasSuffix(key, allocKey) {
+			reservationKeys = append(reservationKeys, key)
+		}
+	}
+	return reservationKeys
+}
+
+// Check if the allocation has already been reserved. An ask can reserve multiple nodes if the request has a repeat set
+// larger than 1. It can never reserve more than the repeat number of nodes.
+// No locking must be called while holding the lock
+func (sa *Application) canAskReserve(ask *AllocationAsk) bool {
+	allocKey := ask.AllocationKey
+	pending := int(ask.GetPendingAskRepeat())
+	resNumber := sa.IsAskReserved(allocKey)
+	if len(resNumber) >= pending {
+		log.Logger().Debug("reservation exceeds repeats",
+			zap.String("askKey", allocKey),
+			zap.Int("askPending", pending),
+			zap.Int("askReserved", len(resNumber)))
+	}
+	return pending > len(resNumber)
+}
+
+// Sort the request for the app in order based on the priority of the request.
+// The sorted list only contains candidates that have an outstanding repeat.
+// No locking must be called while holding the lock
+func (sa *Application) sortRequests(ascending bool) {
+	sa.sortedRequests = nil
+	for _, request := range sa.requests {
+		if request.GetPendingAskRepeat() == 0 {
+			continue
+		}
+		sa.sortedRequests = append(sa.sortedRequests, request)
+	}
+	// we might not have any requests
+	if len(sa.sortedRequests) > 0 {
+		sortAskByPriority(sa.sortedRequests, ascending)
+	}
+}
+
+func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, total *[]*AllocationAsk) {
+	sa.RLock()
+	defer sa.RUnlock()
+
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	for _, request := range sa.sortedRequests {
+		if headRoom == nil || resources.FitIn(headRoom, request.AllocatedResource) {
+			// if headroom is still enough for the resources
+			*total = append(*total, request)
+			if headRoom != nil {
+				headRoom.SubFrom(request.AllocatedResource)
+			}
+		}
+	}
+}
+
+// Try a regular allocation of the pending requests
+func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	// get all the requests from the app sorted in order
+	for _, request := range sa.sortedRequests {
+		// resource must fit in headroom otherwise skip the request
+		if !resources.FitIn(headRoom, request.AllocatedResource) {
+			// post scheduling events via the event plugin
+			if eventCache := events.GetEventCache(); eventCache != nil {
+				message := fmt.Sprintf("Application %s does not fit into %s queue", request.ApplicationID, sa.QueueName)
+				if event, err := events.CreateRequestEventRecord(request.AllocationKey, request.ApplicationID, "InsufficientQueueResources", message); err != nil {
+					log.Logger().Warn("Event creation failed",
+						zap.String("event message", message),
+						zap.Error(err))
+				} else {
+					eventCache.AddEvent(event)
+				}
+			}
+			continue
+		}
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodes(request, iterator)
+			// have a candidate return it
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	// no requests fit, skip to next app
+	return nil
+}
+
+// Try a reserved allocation of an outstanding reservation
+func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// process all outstanding reservations and pick the first one that fits
+	for _, reserve := range sa.reservations {
+		ask := sa.requests[reserve.askKey]
+		// sanity check and cleanup if needed
+		if ask == nil || ask.GetPendingAskRepeat() == 0 {
+			var unreserveAsk *AllocationAsk
+			// if the ask was not found we need to construct one to unreserve
+			if ask == nil {
+				unreserveAsk = &AllocationAsk{
+					AllocationKey: reserve.askKey,
+					ApplicationID: sa.ApplicationID,
+				}
+			} else {
+				unreserveAsk = ask
+			}
+			// remove the reservation as this should not be reserved
+			alloc := newReservedAllocation(Unreserved, reserve.nodeID, unreserveAsk)
+			return alloc
+		}
+		// check if this fits in the queue's head room
+		if !resources.FitIn(headRoom, ask.AllocatedResource) {
+			continue
+		}
+		// check allocation possibility
+		alloc := sa.tryNode(reserve.node, ask)
+		// allocation worked fix the result and return
+		if alloc != nil {
+			alloc.Result = AllocatedReserved
+			return alloc
+		}
+	}
+	// lets try this on all other nodes
+	for _, reserve := range sa.reservations {
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodesNoReserve(reserve.ask, iterator, reserve.nodeID)
+			// have a candidate return it, including the node that was reserved
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	return nil
+}
+
+// Try all the nodes for a reserved request that have not been tried yet.
+// This should never result in a reservation as the ask is already reserved
+func (sa *Application) tryNodesNoReserve(ask *AllocationAsk, iterator interfaces.NodeIterator, reservedNode string) *Allocation {
+	for iterator.HasNext() {
+		node, ok := iterator.Next().(*Node)
+		if !ok {
+			log.Logger().Debug("Node iterator failed to return a node")
+			return nil
+		}
+		// skip over the node if the resource does not fit the node or this is the reserved node.
+		if !node.FitInNode(ask.AllocatedResource) || node.NodeID == reservedNode {
+			continue
+		}
+		alloc := sa.tryNode(node, ask)
+		// allocation worked: update result and return
+		if alloc != nil {
+			alloc.ReservedNodeID = reservedNode
+			alloc.Result = AllocatedReserved
+			return alloc
+		}
+	}
+	// ask does not fit, skip to next ask
+	return nil
+}
+
+// Try all the nodes for a request. The result is an allocation or reservation of a node.
+// New allocations can only be reserved after a delay.
+func (sa *Application) tryNodes(ask *AllocationAsk, iterator interfaces.NodeIterator) *Allocation {
+	var nodeToReserve *Node
+	scoreReserved := math.Inf(1)
+	// check if the ask is reserved or not
+	allocKey := ask.AllocationKey
+	reservedAsks := sa.IsAskReserved(allocKey)
+	allowReserve := len(reservedAsks) < int(ask.pendingRepeatAsk)
+	for iterator.HasNext() {
+		node, ok := iterator.Next().(*Node)
+		if !ok {
+			log.Logger().Debug("Node iterator failed to return a node")
+			return nil
+		}
+		// skip over the node if the resource does not fit the node at all.
+		if !node.FitInNode(ask.AllocatedResource) {
+			continue
+		}
+		alloc := sa.tryNode(node, ask)
+		// allocation worked so return
+		if alloc != nil {
+			// check if the node was reserved for this ask: if it is set the result and return
+			// NOTE: this is a safeguard as reserved nodes should never be part of the iterator
+			// but we have no locking
+			if _, ok = sa.reservations[reservationKey(node, nil, ask)]; ok {
+				log.Logger().Debug("allocate found reserved ask during non reserved allocate",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("nodeID", node.NodeID),
+					zap.String("allocationKey", allocKey))
+				alloc.Result = AllocatedReserved
+				return alloc
+			}
+			// we could also have a different node reserved for this ask if it has pick one of
+			// the reserved nodes to unreserve (first one in the list)
+			if len(reservedAsks) > 0 {
+				nodeID := strings.TrimSuffix(reservedAsks[0], "|"+allocKey)
+				log.Logger().Debug("allocate picking reserved ask during non reserved allocate",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("nodeID", nodeID),
+					zap.String("allocationKey", allocKey))
+				alloc.Result = AllocatedReserved
+				alloc.ReservedNodeID = nodeID
+				return alloc
+			}
+			// nothing reserved just return this as a normal alloc
+			return alloc
+		}
+		// nothing allocated should we look at a reservation?
+		// TODO make this smarter a hardcoded delay is not the right thing
+		askAge := time.Since(ask.GetCreateTime())
+		if allowReserve && askAge > reservationDelay {
+			log.Logger().Debug("app reservation check",
+				zap.String("allocationKey", allocKey),
+				zap.Time("createTime", ask.GetCreateTime()),
+				zap.Duration("askAge", askAge),
+				zap.Duration("reservationDelay", reservationDelay))
+			score := ask.AllocatedResource.FitInScore(node.GetAvailableResource())
+			// Record the so-far best node to reserve
+			if score < scoreReserved {
+				scoreReserved = score
+				nodeToReserve = node
+			}
+		}
+	}
+	// we have not allocated yet, check if we should reserve
+	// NOTE: the node should not be reserved as the iterator filters them but we do not lock the nodes
+	if nodeToReserve != nil && !nodeToReserve.IsReserved() {
+		log.Logger().Debug("found candidate node for app reservation",
+			zap.String("appID", sa.ApplicationID),
+			zap.String("nodeID", nodeToReserve.NodeID),
+			zap.String("allocationKey", allocKey),
+			zap.Int("reservations", len(reservedAsks)),
+			zap.Int32("pendingRepeats", ask.pendingRepeatAsk))
+		// skip the node if conditions can not be satisfied
+		if !nodeToReserve.preReserveConditions(allocKey) {
+			return nil
+		}
+		// return reservation allocation and mark it as a reservation
+		alloc := newReservedAllocation(Reserved, nodeToReserve.NodeID, ask)
+		return alloc
+	}
+	// ask does not fit, skip to next ask
+	return nil
+}
+
+// Try allocating on one specific node
+func (sa *Application) tryNode(node *Node, ask *AllocationAsk) *Allocation {
+	allocKey := ask.AllocationKey
+	toAllocate := ask.AllocatedResource
+	// create the key for the reservation
+	if err := node.preAllocateCheck(toAllocate, reservationKey(nil, sa, ask), false); err != nil {
+		// skip schedule onto node
+		return nil
+	}
+	// skip the node if conditions can not be satisfied
+	if !node.preAllocateConditions(allocKey) {
+		return nil
+	}
+	// everything OK really allocate
+	alloc := NewAllocation(common.GetNewUUID(), node.NodeID, ask)
+	if node.AddAllocation(alloc) {
+		// mark this ask as allocated by lowering the repeat
+		_, err := sa.updateAskRepeatInternal(ask, -1)
+		if err != nil {
+			log.Logger().Debug("ask repeat update failed unexpectedly",
+				zap.Error(err))
+		}
+		if err = sa.queue.IncAllocatedResource(alloc.AllocatedResource, false); err != nil {

Review comment:
       What if the earlier line `sa.updateAskRepeatInternal(ask, -1)` was successful, but this line goes wrong.
   That means we already deduct the ask repeat, however, we do not have an allocation for it. Looks like this will go into an inconsistent state.
   

##########
File path: pkg/scheduler/objects/queue.go
##########
@@ -0,0 +1,1009 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"fmt"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/looplab/fsm"
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/security"
+	"github.com/apache/incubator-yunikorn-core/pkg/interfaces"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/metrics"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/policies"
+	"github.com/apache/incubator-yunikorn-core/pkg/webservice/dao"
+)
+
+const appTagNamespaceResourceQuota = "namespace.resourcequota"
+
+// Represents Queue inside Scheduler
+type Queue struct {
+	QueuePath string // Fully qualified path for the queue
+	Name      string // Queue name as in the config etc.
+
+	// Private fields need protection
+	sortType     policies.SortPolicy     // How applications (leaf) or queues (parents) are sorted
+	children     map[string]*Queue       // Only for direct children, parent queue only
+	applications map[string]*Application // only for leaf queue
+	reservedApps map[string]int          // applications reserved within this queue, with reservation count
+	parent       *Queue                  // link back to the parent in the scheduler
+	preempting   *resources.Resource     // resource considered for preemption in the queue
+	pending      *resources.Resource     // pending resource for the apps in the queue
+
+	// The queue properties should be treated as immutable the value is a merge of the
+	// parent properties with the config for this queue only manipulated during creation
+	// of the queue or via a queue configuration update.
+	properties         map[string]string
+	adminACL           security.ACL        // admin ACL
+	submitACL          security.ACL        // submit ACL
+	maxResource        *resources.Resource // When not set, max = nil
+	guaranteedResource *resources.Resource // When not set, Guaranteed == 0
+	allocatedResource  *resources.Resource // set based on allocation
+	isLeaf             bool                // this is a leaf queue or not (i.e. parent)
+	isManaged          bool                // queue is part of the config, not auto created
+	stateMachine       *fsm.FSM            // the state of the queue for scheduling
+	stateTime          time.Time           // last time the state was updated (needed for cleanup)
+
+	sync.RWMutex
+}
+
+func newBlankQueue() *Queue {
+	return &Queue{
+		children:          make(map[string]*Queue),
+		applications:      make(map[string]*Application),
+		reservedApps:      make(map[string]int),
+		properties:        make(map[string]string),
+		stateMachine:      NewObjectState(),
+		allocatedResource: resources.NewResource(),
+		preempting:        resources.NewResource(),
+		pending:           resources.NewResource(),
+	}
+}
+
+// Create a new queue from scratch based on the configuration
+// lock free as it cannot be referenced yet
+func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
+	sq := newBlankQueue()
+	sq.Name = strings.ToLower(conf.Name)
+	sq.QueuePath = strings.ToLower(conf.Name)
+	sq.parent = parent
+	sq.isManaged = true
+
+	// update the properties
+	if err := sq.setQueueConfig(conf); err != nil {
+		return nil, fmt.Errorf("configured queue creation failed: %s", err)
+	}
+
+	// add to the parent, we might have an overall lock already
+	// still need to make sure we lock the parent so we do not interfere with scheduling
+	if parent != nil {
+		sq.QueuePath = parent.QueuePath + configs.DOT + sq.Name
+		err := parent.addChildQueue(sq)
+		if err != nil {
+			return nil, fmt.Errorf("configured queue creation failed: %s", err)
+		}
+		// pull the properties from the parent that should be set on the child
+		sq.mergeProperties(parent.getProperties(), conf.Properties)
+	}
+	sq.UpdateSortType()
+
+	log.Logger().Debug("configured queue added to scheduler",
+		zap.String("queueName", sq.QueuePath))
+
+	return sq, nil
+}
+
+// Add a new queue to the system based on the placement rules
+// A dynamically added queue can never be the root queue so parent must be set
+// lock free as it cannot be referenced yet
+func NewDynamicQueue(name string, leaf bool, parent *Queue) (*Queue, error) {
+	// fail without a parent
+	if parent == nil {
+		return nil, fmt.Errorf("dynamic queue can not be added without parent: %s", name)
+	}
+	// name might not be checked do it here
+	if !configs.QueueNameRegExp.MatchString(name) {
+		return nil, fmt.Errorf("invalid queue name %s, a name must only have alphanumeric characters,"+
+			" - or _, and be no longer than 64 characters", name)
+	}
+	sq := newBlankQueue()
+	sq.Name = strings.ToLower(name)
+	sq.QueuePath = parent.QueuePath + configs.DOT + sq.Name
+	sq.parent = parent
+	sq.isManaged = false
+	sq.isLeaf = leaf
+
+	// add to the parent, we might have a partition lock already
+	// still need to make sure we lock the parent so we do not interfere with scheduling
+	err := parent.addChildQueue(sq)
+	if err != nil {
+		return nil, fmt.Errorf("dynamic queue creation failed: %s", err)
+	}
+	// pull the properties from the parent that should be set on the child
+	sq.setTemplateProperties(parent.getProperties())
+	sq.UpdateSortType()
+	log.Logger().Debug("dynamic queue added to scheduler",
+		zap.String("queueName", sq.QueuePath))
+
+	return sq, nil
+}
+
+// Return a copy of the properties for this queue
+// Will never return a nil, can return an empty map.
+func (sq *Queue) getProperties() map[string]string {
+	sq.Lock()
+	defer sq.Unlock()
+	props := make(map[string]string)
+	for key, value := range sq.properties {
+		props[key] = value
+	}
+	return props
+}
+
+// Merge the properties from the parent queue and the config in the set from new queue
+// lock free call
+func (sq *Queue) mergeProperties(parent, config map[string]string) {
+	// clean out all existing values (handles update case)
+	sq.properties = make(map[string]string)
+	// Set the parent properties
+	if len(parent) != 0 {
+		for key, value := range parent {
+			sq.properties[key] = value
+		}
+	}
+	// merge the config properties
+	if len(config) > 0 {
+		for key, value := range config {
+			sq.properties[key] = value
+		}
+	}
+}
+
+// Set the properties that the dynamic child queue inherits from the parent
+// The properties list for the parent must be retrieved using getProperties()
+// This currently only sets the sort policy as it is set on the parent
+// Further implementation is part of YUNIKORN-193
+// lock free call
+func (sq *Queue) setTemplateProperties(parent map[string]string) {
+	// for a leaf queue pull out all values from the template and set each of them
+	// See YUNIKORN-193: for now just copy one attr from parent
+	if sq.isLeaf {
+		sq.properties[configs.ApplicationSortPolicy] = parent[configs.ApplicationSortPolicy]
+	}
+	// for a parent queue we just copy the template from its parent (no need to be recursive)
+	// this stops at the first managed queue
+	// See YUNIKORN-193
+}
+
+func (sq *Queue) SetQueueConfig(conf configs.QueueConfig) error {
+	sq.Lock()
+	defer sq.Unlock()
+	return sq.setQueueConfig(conf)
+}
+
+// Apply all the properties to the queue from the config
+// lock free call, must be called holding the queue lock or during create only
+func (sq *Queue) setQueueConfig(conf configs.QueueConfig) error {
+	// Set the ACLs
+	var err error
+	sq.submitACL, err = security.NewACL(conf.SubmitACL)
+	if err != nil {
+		log.Logger().Error("parsing submit ACL failed this should not happen",
+			zap.Error(err))
+		return err
+	}
+	sq.adminACL, err = security.NewACL(conf.AdminACL)
+	if err != nil {
+		log.Logger().Error("parsing admin ACL failed this should not happen",
+			zap.Error(err))
+		return err
+	}
+	// Change from unmanaged to managed
+	if !sq.isManaged {
+		log.Logger().Info("changed dynamic queue to managed",
+			zap.String("queue", sq.QueuePath))
+		sq.isManaged = true
+	}
+
+	sq.isLeaf = !conf.Parent
+	// Make sure the parent flag is set correctly: config might expect auto parent type creation
+	if len(conf.Queues) > 0 {
+		sq.isLeaf = false
+	}
+
+	// Load the max resources
+	sq.maxResource, err = resources.NewResourceFromConf(conf.Resources.Max)
+	if err != nil {
+		log.Logger().Error("parsing failed on max resources this should not happen",
+			zap.Error(err))
+		return err
+	}
+	if len(sq.maxResource.Resources) == 0 || resources.IsZero(sq.maxResource) {
+		log.Logger().Debug("max resources config setting ignored: cannot set zero max resources")
+		sq.maxResource = nil
+	}
+
+	// Load the guaranteed resources
+	sq.guaranteedResource, err = resources.NewResourceFromConf(conf.Resources.Guaranteed)
+	if err != nil {
+		log.Logger().Error("parsing failed on guaranteed resources this should not happen",
+			zap.Error(err))
+		return err
+	}
+	if len(sq.guaranteedResource.Resources) == 0 || resources.IsZero(sq.guaranteedResource) {
+		log.Logger().Debug("guaranteed resources config setting ignored: guaranteed must be non-zero to take effect")
+		sq.guaranteedResource = nil
+	}
+
+	sq.properties = conf.Properties
+	return nil
+}
+
+// Update the sortType for the queue based on the current properties
+func (sq *Queue) UpdateSortType() {
+	sq.Lock()
+	defer sq.Unlock()
+	// set the defaults, override with what is in the configured properties
+	if sq.isLeaf {
+		// walk over all properties and process
+		var err error
+		sq.sortType = policies.Undefined
+		for key, value := range sq.properties {
+			if key == configs.ApplicationSortPolicy {
+				sq.sortType, err = policies.SortPolicyFromString(value)
+				if err != nil {
+					log.Logger().Debug("application sort property configuration error",
+						zap.Error(err))
+				}
+			}
+			// for now skip the rest just log them
+			log.Logger().Debug("queue property skipped",
+				zap.String("key", key),
+				zap.String("value", value))
+		}
+		// if it is not defined default to fifo
+		if sq.sortType == policies.Undefined {
+			sq.sortType = policies.FifoSortPolicy
+		}
+		return
+	}
+	// set the sorting type for parent queues
+	sq.sortType = policies.FairSortPolicy
+}
+
+func (sq *Queue) GetQueuePath() string {
+	sq.Lock()
+	defer sq.Unlock()
+	return sq.QueuePath
+}
+
+// Is the queue marked for deletion and can only handle existing application requests.
+// No new applications will be accepted.
+func (sq *Queue) IsDraining() bool {
+	return sq.stateMachine.Is(Draining.String())
+}
+
+// Is the queue in a normal active state.
+func (sq *Queue) IsRunning() bool {
+	return sq.stateMachine.Is(Active.String())
+}
+
+// Is the queue stopped, not active in scheduling at all.
+func (sq *Queue) IsStopped() bool {
+	return sq.stateMachine.Is(Stopped.String())
+}
+
+// Return the current state of the queue
+func (sq *Queue) CurrentState() string {
+	return sq.stateMachine.Current()
+}
+
+// Handle the state event for the queue.
+// The state machine handles the locking.
+func (sq *Queue) handleQueueEvent(event ObjectEvent) error {
+	err := sq.stateMachine.Event(event.String(), sq.QueuePath)
+	// err is nil the state transition was done
+	if err == nil {
+		sq.stateTime = time.Now()
+		return nil
+	}
+	// handle the same state transition not nil error (limit of fsm).
+	if err.Error() == noTransition {
+		return nil
+	}
+	return err
+}
+
+// Return the allocated resources for this queue
+func (sq *Queue) GetAllocatedResource() *resources.Resource {
+	sq.RLock()
+	defer sq.RUnlock()
+	return sq.allocatedResource.Clone()
+}
+
+// Return the guaranteed resource for the queue.
+func (sq *Queue) GetGuaranteedResource() *resources.Resource {
+	sq.RLock()
+	defer sq.RUnlock()
+	return sq.guaranteedResource
+}
+
+// Check if the user has access to the queue to submit an application recursively.
+// This will check the submit ACL and the admin ACL.
+func (sq *Queue) CheckSubmitAccess(user security.UserGroup) bool {
+	sq.RLock()
+	allow := sq.submitACL.CheckAccess(user) || sq.adminACL.CheckAccess(user)
+	sq.RUnlock()
+	if !allow && sq.parent != nil {
+		allow = sq.parent.CheckSubmitAccess(user)
+	}
+	return allow
+}
+
+// Check if the user has access to the queue for admin actions recursively.
+func (sq *Queue) CheckAdminAccess(user security.UserGroup) bool {
+	sq.RLock()
+	allow := sq.adminACL.CheckAccess(user)
+	sq.RUnlock()
+	if !allow && sq.parent != nil {
+		allow = sq.parent.CheckAdminAccess(user)
+	}
+	return allow
+}
+
+// Convert the queue hierarchy into an object for the webservice
+
+func (sq *Queue) GetQueueInfos() dao.QueueDAOInfo {
+	queueInfo := dao.QueueDAOInfo{}
+	for _, child := range sq.GetCopyOfChildren() {
+		queueInfo.ChildQueues = append(queueInfo.ChildQueues, child.GetQueueInfos())
+	}
+
+	// children are done we can now lock just this queue.
+	sq.RLock()
+	defer sq.RUnlock()
+	queueInfo.QueueName = sq.Name
+	queueInfo.Status = sq.stateMachine.Current()
+	queueInfo.Capacities = dao.QueueCapacity{
+		Capacity:     sq.guaranteedResource.DAOString(),
+		MaxCapacity:  sq.maxResource.DAOString(),
+		UsedCapacity: sq.allocatedResource.DAOString(),
+		AbsUsedCapacity: resources.CalculateAbsUsedCapacity(
+			sq.maxResource, sq.allocatedResource).DAOString(),
+	}
+	queueInfo.Properties = make(map[string]string)
+	for k, v := range sq.properties {
+		queueInfo.Properties[k] = v
+	}
+	return queueInfo
+}
+
+// Return the pending resources for this queue
+func (sq *Queue) GetPendingResource() *resources.Resource {
+	sq.RLock()
+	defer sq.RUnlock()
+	return sq.pending
+}
+
+// Update pending resource of this queue
+func (sq *Queue) incPendingResource(delta *resources.Resource) {
+	// update the parent
+	if sq.parent != nil {
+		sq.parent.incPendingResource(delta)
+	}
+	// update this queue
+	sq.Lock()
+	defer sq.Unlock()
+	sq.pending = resources.Add(sq.pending, delta)
+}
+
+// Remove pending resource of this queue
+func (sq *Queue) decPendingResource(delta *resources.Resource) {
+	// update the parent
+	if sq.parent != nil {
+		sq.parent.decPendingResource(delta)
+	}
+	// update this queue
+	sq.Lock()
+	defer sq.Unlock()
+	var err error
+	sq.pending, err = resources.SubErrorNegative(sq.pending, delta)
+	if err != nil {
+		log.Logger().Warn("Pending resources went negative",
+			zap.String("queueName", sq.QueuePath),
+			zap.Error(err))
+	}
+}
+
+// Add  app to the queue. All checks are assumed to have passed before we get here.
+// No update of pending resource is needed as it should not have any requests yet.
+// Replaces the existing application without further checks.
+func (sq *Queue) AddApplication(app *Application) {
+	sq.Lock()
+	defer sq.Unlock()
+	sq.applications[app.ApplicationID] = app
+	// YUNIKORN-199: update the quota from the namespace
+	// get the tag with the quota
+	quota := app.GetTag(appTagNamespaceResourceQuota)
+	if quota == "" {
+		return
+	}
+	// need to set a quota: convert json string to resource
+	res, err := resources.NewResourceFromString(quota)
+	if err != nil {
+		log.Logger().Error("application resource quota conversion failure",
+			zap.String("json quota string", quota),
+			zap.Error(err))
+		return
+	}
+	if !resources.StrictlyGreaterThanZero(res) {
+		log.Logger().Error("application resource quota has at least one 0 value: cannot set queue limit",
+			zap.String("maxResource", res.String()))
+		return
+	}
+	// set the quota
+	if sq.isManaged {
+		log.Logger().Warn("Trying to set max resources set on a queue that is not an unmanaged leaf",
+			zap.String("queueName", sq.QueuePath))
+		return
+	}
+	sq.maxResource = res
+}
+
+// Remove the app from the list of tracked applications. Make sure that the app
+// is assigned to this queue and not removed yet.
+// If not found this call is a noop
+func (sq *Queue) RemoveApplication(app *Application) {
+	// clean up any outstanding pending resources
+	appID := app.ApplicationID
+	if _, ok := sq.applications[appID]; !ok {
+		log.Logger().Debug("Application not found while removing from queue",
+			zap.String("queueName", sq.QueuePath),
+			zap.String("applicationID", appID))
+		return
+	}
+	if appPending := app.GetPendingResource(); !resources.IsZero(appPending) {
+		sq.decPendingResource(appPending)
+	}
+	// clean up the allocated resource
+	if appAllocated := app.GetAllocatedResource(); !resources.IsZero(appAllocated) {
+		// failures are logged in the decrement do not do it twice
+		//nolint:errcheck
+		_ = sq.DecAllocatedResource(appAllocated)
+	}
+	sq.Lock()
+	defer sq.Unlock()
+
+	delete(sq.applications, appID)
+}
+
+// Get a copy of all apps holding the lock
+func (sq *Queue) getCopyOfApps() map[string]*Application {
+	sq.RLock()
+	defer sq.RUnlock()
+	appsCopy := make(map[string]*Application)
+	for appID, app := range sq.applications {
+		appsCopy[appID] = app
+	}
+	return appsCopy
+}
+
+// Get a copy of the child queues
+// This is used by the partition manager to find all queues to clean however we can not
+// guarantee that there is no new child added while we clean up since there is no overall
+// lock on the scheduler. We'll need to test just before to make sure the parent is empty
+func (sq *Queue) GetCopyOfChildren() map[string]*Queue {
+	sq.RLock()
+	defer sq.RUnlock()
+	childCopy := make(map[string]*Queue)
+	for k, v := range sq.children {
+		childCopy[k] = v
+	}
+	return childCopy
+}
+
+// Check if the queue is empty
+// A parent queue is empty when it has no children left
+// A leaf queue is empty when there are no applications left
+func (sq *Queue) IsEmpty() bool {
+	sq.RLock()
+	defer sq.RUnlock()
+	if sq.isLeaf {
+		return len(sq.applications) == 0
+	}
+	return len(sq.children) == 0
+}
+
+// Remove a child queue from this queue.
+// No checks are performed: if the child has been removed already it is a noop.
+// This may only be called by the queue removal itself on the registered parent.
+// Queue removal is always a bottom up action: leaves first then the parent.
+func (sq *Queue) removeChildQueue(name string) {
+	sq.Lock()
+	defer sq.Unlock()
+
+	delete(sq.children, name)
+}
+
+// Add a child queue to this queue.
+func (sq *Queue) addChildQueue(child *Queue) error {
+	sq.Lock()
+	defer sq.Unlock()
+	if sq.isLeaf {
+		return fmt.Errorf("cannot add a child queue to a leaf queue: %s", sq.QueuePath)
+	}
+	if sq.IsDraining() {
+		return fmt.Errorf("cannot add a child queue when queue is marked for deletion: %s", sq.QueuePath)
+	}
+
+	// no need to lock child as it is a new queue which cannot be accessed yet
+	sq.children[child.Name] = child
+	return nil
+}
+
+// Mark the managed queue for removal from the system.
+// This can be executed multiple times and is only effective the first time.
+// This is a noop on an unmanaged queue
+func (sq *Queue) MarkQueueForRemoval() {
+	// need to lock for write as we don't want to add a queue while marking for removal
+	sq.Lock()
+	defer sq.Unlock()
+	// Mark the managed queue for deletion: it is removed from the config let it drain.
+	// Also mark all the managed children for deletion.
+	if sq.isManaged {
+		log.Logger().Info("marking managed queue for deletion",
+			zap.String("queue", sq.QueuePath))
+		if err := sq.handleQueueEvent(Remove); err != nil {
+			log.Logger().Info("failed to mark managed queue for deletion",
+				zap.String("queue", sq.QueuePath),
+				zap.Error(err))
+		}
+		if len(sq.children) > 0 {
+			for _, child := range sq.children {
+				child.MarkQueueForRemoval()
+			}
+		}
+	}
+}
+
+// Get a child queue based on the name of the child.
+func (sq *Queue) GetChildQueue(name string) *Queue {
+	sq.RLock()
+	defer sq.RUnlock()
+
+	return sq.children[name]
+}
+
+// Remove the queue from the structure.
+// Since nothing is allocated there shouldn't be anything referencing this queue any more.
+// The real removal is removing the queue from the parent's child list, use read lock on the queue
+func (sq *Queue) RemoveQueue() bool {
+	sq.RLock()
+	defer sq.RUnlock()
+	// cannot remove a managed queue that is running
+	if sq.isManaged && sq.IsRunning() {
+		return false
+	}
+	// cannot remove a queue that has children or applications assigned
+	if len(sq.children) > 0 || len(sq.applications) > 0 {
+		return false
+	}
+	log.Logger().Info("removing queue", zap.String("queue", sq.QueuePath))
+	// root is always managed and is the only queue with a nil parent: no need to guard
+	sq.parent.removeChildQueue(sq.Name)
+	return true
+}
+
+// Is this queue a leaf or not (i.e parent)
+func (sq *Queue) IsLeafQueue() bool {
+	sq.RLock()
+	defer sq.RUnlock()
+	return sq.isLeaf
+}
+
+// Is this queue managed or not.
+func (sq *Queue) IsManaged() bool {
+	sq.RLock()
+	defer sq.RUnlock()
+	return sq.isManaged
+}
+
+// test only
+func (sq *Queue) isRoot() bool {
+	return sq.parent == nil
+}
+
+// Return the preempting resources for the queue
+func (sq *Queue) GetPreemptingResource() *resources.Resource {
+	sq.RLock()
+	defer sq.RUnlock()
+	return sq.preempting
+}
+
+// Increment the number of resource marked for preemption in the queue.
+func (sq *Queue) IncPreemptingResource(newAlloc *resources.Resource) {
+	sq.Lock()
+	defer sq.Unlock()
+	sq.preempting.AddTo(newAlloc)
+}
+
+// Decrement the number of resource marked for preemption in the queue.
+func (sq *Queue) decPreemptingResource(newAlloc *resources.Resource) {
+	sq.Lock()
+	defer sq.Unlock()
+	var err error
+	sq.preempting, err = resources.SubErrorNegative(sq.preempting, newAlloc)
+	if err != nil {
+		log.Logger().Warn("Preempting resources went negative",
+			zap.String("queueName", sq.QueuePath),
+			zap.Error(err))
+	}
+}
+
+// (Re)Set the preempting resources for the queue.
+// This could be because they are preempted, or the preemption was cancelled.
+func (sq *Queue) setPreemptingResource(newAlloc *resources.Resource) {
+	sq.Lock()
+	defer sq.Unlock()
+	sq.preempting = newAlloc
+}
+
+// Increment the allocated resources for this queue (recursively)
+// Guard against going over max resources if set
+func (sq *Queue) IncAllocatedResource(alloc *resources.Resource, nodeReported bool) error {
+	sq.Lock()
+	defer sq.Unlock()
+
+	// check this queue: failure stops checks if the allocation is not part of a node addition
+	newAllocated := resources.Add(sq.allocatedResource, alloc)
+	if !nodeReported {
+		if sq.maxResource != nil && !resources.FitIn(sq.maxResource, newAllocated) {
+			return fmt.Errorf("allocation (%v) puts queue %s over maximum allocation (%v)",
+				alloc, sq.QueuePath, sq.maxResource)
+		}
+	}
+	// check the parent: need to pass before updating
+	if sq.parent != nil {
+		if err := sq.parent.IncAllocatedResource(alloc, nodeReported); err != nil {
+			log.Logger().Error("parent queue exceeds maximum resource",
+				zap.Any("allocationId", alloc),
+				zap.String("maxResource", sq.maxResource.String()),
+				zap.Error(err))
+			return err
+		}
+	}
+	// all OK update this queue
+	sq.allocatedResource = newAllocated
+	sq.updateUsedResourceMetrics()
+	return nil
+}
+
+// Decrement the allocated resources for this queue (recursively)
+// Guard against going below zero resources.
+func (sq *Queue) DecAllocatedResource(alloc *resources.Resource) error {
+	sq.Lock()
+	defer sq.Unlock()
+
+	// check this queue: failure stops checks
+	if alloc != nil && !resources.FitIn(sq.allocatedResource, alloc) {
+		return fmt.Errorf("released allocation (%v) is larger than '%s' queue allocation (%v)",
+			alloc, sq.QueuePath, sq.allocatedResource)
+	}
+	// check the parent: need to pass before updating
+	if sq.parent != nil {
+		if err := sq.parent.DecAllocatedResource(alloc); err != nil {
+			log.Logger().Error("released allocation is larger than parent queue allocated resource",
+				zap.Any("allocationId", alloc),
+				zap.Any("parent allocatedResource", sq.parent.GetAllocatedResource()),
+				zap.Error(err))
+			return err
+		}
+	}
+	// all OK update the queue
+	sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
+	sq.updateUsedResourceMetrics()
+	return nil
+}
+
+// Return a sorted copy of the applications in the queue. Applications are sorted using the
+// sorting type of the queue.
+// Only applications with a pending resource request are considered.
+// Lock free call all locks are taken when needed in called functions
+func (sq *Queue) sortApplications() []*Application {
+	if !sq.IsLeafQueue() {
+		return nil
+	}
+	// Sort the applications
+	return sortApplications(sq.getCopyOfApps(), sq.getSortType(), sq.GetGuaranteedResource())
+}
+
+// Return a sorted copy of the queues for this parent queue.
+// Only queues with a pending resource request are considered. The queues are sorted using the
+// sorting type for the parent queue.
+// Lock free call all locks are taken when needed in called functions
+func (sq *Queue) sortQueues() []*Queue {
+	if sq.IsLeafQueue() {
+		return nil
+	}
+	// Create a list of the queues with pending resources
+	sortedQueues := make([]*Queue, 0)
+	for _, child := range sq.GetCopyOfChildren() {
+		// a stopped queue cannot be scheduled
+		if child.IsStopped() {
+			continue
+		}
+		// queue must have pending resources to be considered for scheduling
+		if resources.StrictlyGreaterThanZero(child.GetPendingResource()) {
+			sortedQueues = append(sortedQueues, child)
+		}
+	}
+	// Sort the queues
+	sortQueue(sortedQueues, sq.getSortType())
+
+	return sortedQueues
+}
+
+// Get the headroom for the queue this should never be more than the headroom for the parent.
+// In case there are no nodes in a newly started cluster and no queues have a limit configured this call
+// will return nil.
+// NOTE: if a resource quantity is missing and a limit is defined the missing quantity will be seen as a limit of 0.
+// When defining a limit you therefore should define all resource quantities.
+func (sq *Queue) getHeadRoom() *resources.Resource {
+	var parentHeadRoom *resources.Resource
+	if sq.parent != nil {
+		parentHeadRoom = sq.parent.getHeadRoom()
+	}
+	return sq.internalHeadRoom(parentHeadRoom)
+}
+
+// this function returns the max headRoom of a queue
+// this doesn't get the partition resources into the consideration
+func (sq *Queue) getMaxHeadRoom() *resources.Resource {
+	var parentHeadRoom *resources.Resource
+	if sq.parent != nil {
+		parentHeadRoom = sq.parent.getMaxHeadRoom()
+	} else {
+		return nil
+	}
+	return sq.internalHeadRoom(parentHeadRoom)
+}
+
+func (sq *Queue) internalHeadRoom(parentHeadRoom *resources.Resource) *resources.Resource {
+	sq.RLock()
+	defer sq.RUnlock()
+	headRoom := sq.maxResource.Clone()
+
+	// if we have no max set headroom is always the same as the parent
+	if headRoom == nil {
+		return parentHeadRoom
+	}
+	// calculate unused
+	headRoom.SubFrom(sq.allocatedResource)
+	// check the minimum of the two: parentHeadRoom is nil for root
+	if parentHeadRoom == nil {
+		return headRoom
+	}
+	return resources.ComponentWiseMin(headRoom, parentHeadRoom)
+}
+
+// Get the max resource for the queue this should never be more than the max for the parent.
+// The root queue always has its limit set to the total cluster size (dynamic based on node registration)
+// In case there are no nodes in a newly started cluster and no queues have a limit configured this call
+// will return nil.
+// NOTE: if a resource quantity is missing and a limit is defined the missing quantity will be seen as a limit of 0.
+// When defining a limit you therefore should define all resource quantities.
+func (sq *Queue) GetMaxResource() *resources.Resource {
+	// get the limit for the parent first and check against the queue's own
+	var limit *resources.Resource
+	if sq.parent != nil {
+		limit = sq.parent.GetMaxResource()
+	}
+	sq.RLock()
+	defer sq.RUnlock()
+	// no queue limit set, not even for root
+	if limit == nil {
+		if sq.maxResource == nil {
+			return nil
+		}
+		return sq.maxResource.Clone()
+	}
+	// parent limit set no queue limit return parent
+	if sq.maxResource == nil {
+		return limit
+	}
+	// calculate the smallest value for each type
+	return resources.ComponentWiseMin(limit, sq.maxResource)
+}
+
+// Set the max resource for root the queue.
+// Should only happen on the root, all other queues get it from the config via properties.
+func (sq *Queue) SetMaxResource(max *resources.Resource) {
+	sq.Lock()
+	defer sq.Unlock()
+
+	if sq.parent != nil {
+		log.Logger().Warn("Max resources set on a queue that is not the root",
+			zap.String("queueName", sq.QueuePath))
+		return
+	}
+	sq.maxResource = max.Clone()
+}
+
+// Try allocate pending requests. This only gets called if there is a pending request on this queue or its children.
+// This is a depth first algorithm: descend into the depth of the queue tree first. Child queues are sorted based on
+// the configured queue sortPolicy. Queues without pending resources are skipped.
+// Applications are sorted based on the application sortPolicy. Applications without pending resources are skipped.
+// Lock free call this all locks are taken when needed in called functions
+func (sq *Queue) TryAllocate(iterator func() interfaces.NodeIterator) *Allocation {
+	if sq.IsLeafQueue() {
+		// get the headroom
+		headRoom := sq.getHeadRoom()
+		// process the apps (filters out app without pending requests)
+		for _, app := range sq.sortApplications() {
+			alloc := app.tryAllocate(headRoom, iterator)
+			if alloc != nil {
+				log.Logger().Debug("allocation found on queue",
+					zap.String("queueName", sq.QueuePath),
+					zap.String("appID", app.ApplicationID),
+					zap.String("allocation", alloc.String()))
+				return alloc
+			}
+		}
+	} else {
+		// process the child queues (filters out queues without pending requests)
+		for _, child := range sq.sortQueues() {
+			alloc := child.TryAllocate(iterator)
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	return nil
+}
+
+func (sq *Queue) GetQueueOutstandingRequests(total *[]*AllocationAsk) {
+	if sq.IsLeafQueue() {
+		headRoom := sq.getMaxHeadRoom()
+		for _, app := range sq.sortApplications() {
+			app.getOutstandingRequests(headRoom, total)
+		}
+	} else {
+		for _, child := range sq.sortQueues() {
+			child.GetQueueOutstandingRequests(total)
+		}
+	}
+}
+
+// Try allocate reserved requests. This only gets called if there is a pending request on this queue or its children.
+// This is a depth first algorithm: descend into the depth of the queue tree first. Child queues are sorted based on
+// the configured queue sortPolicy. Queues without pending resources are skipped.
+// Applications are currently NOT sorted and are iterated over in a random order.
+// Lock free call this all locks are taken when needed in called functions
+func (sq *Queue) TryReservedAllocate(iterator func() interfaces.NodeIterator) *Allocation {
+	if sq.IsLeafQueue() {
+		// skip if it has no reservations
+		reservedCopy := sq.getReservedApps()
+		if len(reservedCopy) != 0 {
+			// get the headroom
+			headRoom := sq.getHeadRoom()
+			// process the apps
+			for appID, numRes := range reservedCopy {
+				if numRes > 1 {
+					log.Logger().Debug("multiple reservations found for application trying to allocate one",
+						zap.String("appID", appID),
+						zap.Int("reservations", numRes))
+				}
+				app := sq.getApplication(appID)

Review comment:
       I think we need a nil check here, if the app doesn't exist, we can simply return a nil

##########
File path: pkg/scheduler/context.go
##########
@@ -0,0 +1,761 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package scheduler
+
+import (
+	"fmt"
+	"math"
+	"sync"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/handler"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/metrics"
+	"github.com/apache/incubator-yunikorn-core/pkg/plugins"
+	"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/objects"
+	siCommon "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/common"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+const disableReservation = "DISABLE_RESERVATION"
+
+type ClusterContext struct {
+	partitions     map[string]*PartitionContext
+	policyGroup    string
+	rmEventHandler handler.EventHandler
+
+	// config values that change scheduling behaviour
+	needPreemption      bool
+	reservationDisabled bool
+
+	sync.RWMutex
+}
+
+// Create a new cluster context to be used outside of the event system.
+// test only
+func NewClusterContext(rmID, policyGroup string) (*ClusterContext, error) {
+	// load the config this returns a validated configuration
+	conf, err := configs.SchedulerConfigLoader(policyGroup)
+	if err != nil {
+		return nil, err
+	}
+	// create the context and set the policyGroup
+	cc := &ClusterContext{
+		partitions:          make(map[string]*PartitionContext),
+		policyGroup:         policyGroup,
+		reservationDisabled: common.GetBoolEnvVar(disableReservation, false),
+	}
+	// If reservation is turned off set the reservation delay to the maximum duration defined.
+	// The time package does not export maxDuration so use the equivalent from the math package.
+	if cc.reservationDisabled {
+		objects.SetReservationDelay(math.MaxInt64)
+	}
+	err = cc.updateSchedulerConfig(conf, rmID)
+	if err != nil {
+		return nil, err
+	}
+	// update the global config
+	configs.ConfigContext.Set(policyGroup, conf)
+	return cc, nil
+}
+
+func newClusterContext() *ClusterContext {
+	cc := &ClusterContext{
+		partitions:          make(map[string]*PartitionContext),
+		reservationDisabled: common.GetBoolEnvVar(disableReservation, false),
+	}
+	// If reservation is turned off set the reservation delay to the maximum duration defined.
+	// The time package does not export maxDuration so use the equivalent from the math package.
+	if cc.reservationDisabled {
+		objects.SetReservationDelay(math.MaxInt64)
+	}
+	return cc
+}
+
+func (cc *ClusterContext) setEventHandler(rmHandler handler.EventHandler) {
+	cc.rmEventHandler = rmHandler
+}
+
+// The main scheduling routine.
+// Process each partition in the scheduler, walk over each queue and app to check if anything can be scheduled.
+// This can be forked into a go routine per partition if needed to increase parallel allocations
+func (cc *ClusterContext) schedule() {
+	// schedule each partition defined in the cluster
+	for _, psc := range cc.GetPartitionMapClone() {
+		// if there are no resources in the partition just skip
+		if psc.root.GetMaxResource() == nil {
+			continue
+		}
+		// try reservations first
+		alloc := psc.tryReservedAllocate()

Review comment:
       We are passing node iterator for both `psc.tryReservedAllocate()` and `psc.tryAllocate()` ,
   this means in each scheduling cycle, we sort the full node list twice each time
   Looks like we overlooked this in the past, I think we need to log a JIRA to improve this. Ensure we only sort the nodes one time per cycle.

##########
File path: pkg/scheduler/context.go
##########
@@ -0,0 +1,761 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package scheduler
+
+import (
+	"fmt"
+	"math"
+	"sync"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/handler"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/metrics"
+	"github.com/apache/incubator-yunikorn-core/pkg/plugins"
+	"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/objects"
+	siCommon "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/common"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+const disableReservation = "DISABLE_RESERVATION"
+
+type ClusterContext struct {
+	partitions     map[string]*PartitionContext
+	policyGroup    string
+	rmEventHandler handler.EventHandler
+
+	// config values that change scheduling behaviour
+	needPreemption      bool
+	reservationDisabled bool
+
+	sync.RWMutex
+}
+
+// Create a new cluster context to be used outside of the event system.
+// test only
+func NewClusterContext(rmID, policyGroup string) (*ClusterContext, error) {
+	// load the config this returns a validated configuration
+	conf, err := configs.SchedulerConfigLoader(policyGroup)
+	if err != nil {
+		return nil, err
+	}
+	// create the context and set the policyGroup
+	cc := &ClusterContext{
+		partitions:          make(map[string]*PartitionContext),
+		policyGroup:         policyGroup,
+		reservationDisabled: common.GetBoolEnvVar(disableReservation, false),
+	}
+	// If reservation is turned off set the reservation delay to the maximum duration defined.
+	// The time package does not export maxDuration so use the equivalent from the math package.
+	if cc.reservationDisabled {
+		objects.SetReservationDelay(math.MaxInt64)
+	}
+	err = cc.updateSchedulerConfig(conf, rmID)
+	if err != nil {
+		return nil, err
+	}
+	// update the global config
+	configs.ConfigContext.Set(policyGroup, conf)
+	return cc, nil
+}
+
+func newClusterContext() *ClusterContext {
+	cc := &ClusterContext{
+		partitions:          make(map[string]*PartitionContext),
+		reservationDisabled: common.GetBoolEnvVar(disableReservation, false),
+	}
+	// If reservation is turned off set the reservation delay to the maximum duration defined.
+	// The time package does not export maxDuration so use the equivalent from the math package.
+	if cc.reservationDisabled {
+		objects.SetReservationDelay(math.MaxInt64)
+	}
+	return cc
+}
+
+func (cc *ClusterContext) setEventHandler(rmHandler handler.EventHandler) {
+	cc.rmEventHandler = rmHandler
+}
+
+// The main scheduling routine.
+// Process each partition in the scheduler, walk over each queue and app to check if anything can be scheduled.
+// This can be forked into a go routine per partition if needed to increase parallel allocations
+func (cc *ClusterContext) schedule() {
+	// schedule each partition defined in the cluster
+	for _, psc := range cc.GetPartitionMapClone() {
+		// if there are no resources in the partition just skip
+		if psc.root.GetMaxResource() == nil {
+			continue
+		}
+		// try reservations first
+		alloc := psc.tryReservedAllocate()
+		// nothing reserved that can be allocated try normal allocate
+		if alloc == nil {
+			alloc = psc.tryAllocate()
+		}
+		if alloc != nil {
+			// communicate the allocation to the RM
+			cc.rmEventHandler.HandleEvent(&rmevent.RMNewAllocationsEvent{
+				Allocations: []*si.Allocation{alloc.NewSIFromAllocation()},
+				RmID:        psc.RmID,
+			})
+			// TODO: The alloc is just passed to the RM why do we need a callback?
+			// TODO: The comments are from before the cache and scheduler merge
+			// if reconcile plugin is enabled, re-sync the cache now.
+			// before deciding on an allocation, call the reconcile plugin to sync scheduler cache
+			// between core and shim if necessary. This is useful when running multiple allocations

Review comment:
       In the original design, we would like to keep the possibility to run multiple allocation threads.
   but now we are using single thread allocating, so this is no longer a problem.
   For the TODO comment here: the reason is we need this call to refresh shim side cache's data, this must be called when an allocation is "assumed" on a node, then the subsequential scheduling cycle can respect this while evaluating predicate functions, e.g affinity/anti-affinity functions. @wilfred-s can we replace the TODO with the proper explanation?

##########
File path: pkg/scheduler/objects/application.go
##########
@@ -0,0 +1,926 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"fmt"
+	"math"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/looplab/fsm"
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/security"
+	"github.com/apache/incubator-yunikorn-core/pkg/events"
+	"github.com/apache/incubator-yunikorn-core/pkg/handler"
+	"github.com/apache/incubator-yunikorn-core/pkg/interfaces"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	reservationDelay = 2 * time.Second
+	startingTimeout  = time.Minute * 5
+)
+
+type Application struct {
+	ApplicationID  string
+	Partition      string
+	QueueName      string
+	SubmissionTime time.Time
+
+	// Private fields need protection
+	queue             *Queue                    // queue the application is running in
+	pending           *resources.Resource       // pending resources from asks for the app
+	reservations      map[string]*reservation   // a map of reservations
+	requests          map[string]*AllocationAsk // a map of asks
+	sortedRequests    []*AllocationAsk
+	user              security.UserGroup     // owner of the application
+	tags              map[string]string      // application tags used in scheduling
+	allocatedResource *resources.Resource    // total allocated resources
+	allocations       map[string]*Allocation // list of all allocations
+	stateMachine      *fsm.FSM               // application state machine
+	stateTimer        *time.Timer            // timer for state time
+
+	rmEventHandler handler.EventHandler
+	rmID           string
+
+	sync.RWMutex
+}
+
+func newBlankApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string) *Application {
+	return &Application{
+		ApplicationID:     appID,
+		Partition:         partition,
+		QueueName:         queueName,
+		SubmissionTime:    time.Now(),
+		user:              ugi,
+		tags:              tags,
+		pending:           resources.NewResource(),
+		allocatedResource: resources.NewResource(),
+		requests:          make(map[string]*AllocationAsk),
+		reservations:      make(map[string]*reservation),
+		allocations:       make(map[string]*Allocation),
+		stateMachine:      NewAppState(),
+	}
+}
+
+func NewApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string, eventHandler handler.EventHandler, rmID string) *Application {
+	app := newBlankApplication(appID, partition, queueName, ugi, tags)
+	app.rmEventHandler = eventHandler
+	app.rmID = rmID
+	return app
+}
+
+func (sa *Application) String() string {
+	if sa == nil {
+		return "application is nil"
+	}
+	return fmt.Sprintf("ApplicationID: %s, Partition: %s, QueueName: %s, SubmissionTime: %x",
+		sa.ApplicationID, sa.Partition, sa.QueueName, sa.SubmissionTime)
+}
+
+// Set the reservation delay.
+// Set when the cluster context is created to disable reservation.
+func SetReservationDelay(delay time.Duration) {
+	log.Logger().Debug("Set reservation delay",
+		zap.Duration("delay", delay))
+	reservationDelay = delay
+}
+
+// Return the current state or a checked specific state for the application.
+// The state machine handles the locking.
+func (sa *Application) CurrentState() string {
+	return sa.stateMachine.Current()
+}
+
+func (sa *Application) IsStarting() bool {
+	return sa.stateMachine.Is(Starting.String())
+}
+
+func (sa *Application) IsAccepted() bool {
+	return sa.stateMachine.Is(Accepted.String())
+}
+
+func (sa *Application) IsNew() bool {
+	return sa.stateMachine.Is(New.String())
+}
+
+func (sa *Application) IsRunning() bool {
+	return sa.stateMachine.Is(Running.String())
+}
+
+func (sa *Application) IsWaiting() bool {
+	return sa.stateMachine.Is(Waiting.String())
+}
+
+// Handle the state event for the application.
+// The state machine handles the locking.
+func (sa *Application) HandleApplicationEvent(event applicationEvent) error {
+	err := sa.stateMachine.Event(event.String(), sa)
+	// handle the same state transition not nil error (limit of fsm).
+	if err != nil && err.Error() == noTransition {
+		return nil
+	}
+	return err
+}
+
+func (sa *Application) OnStateChange(event *fsm.Event) {
+	updatedApps := make([]*si.UpdatedApplication, 0)
+	updatedApps = append(updatedApps, &si.UpdatedApplication{
+		ApplicationID:            sa.ApplicationID,
+		State:                    sa.stateMachine.Current(),
+		StateTransitionTimestamp: time.Now().UnixNano(),
+		Message:                  fmt.Sprintf("{Status change triggered by the event : %v}", event),
+	})
+
+	if sa.rmEventHandler != nil {
+		sa.rmEventHandler.HandleEvent(
+			&rmevent.RMApplicationUpdateEvent{
+				RmID:                 sa.rmID,
+				AcceptedApplications: make([]*si.AcceptedApplication, 0),
+				RejectedApplications: make([]*si.RejectedApplication, 0),
+				UpdatedApplications:  updatedApps,
+			})
+	}
+}
+
+// Set the starting timer to make sure the application will not get stuck in a starting state too long.
+// This prevents an app from not progressing to Running when it only has 1 allocation.
+// Called when entering the Starting state by the state machine.
+func (sa *Application) SetStartingTimer() {
+	log.Logger().Debug("Application Starting state timer initiated",
+		zap.String("appID", sa.ApplicationID),
+		zap.Duration("timeout", startingTimeout))
+	sa.stateTimer = time.AfterFunc(startingTimeout, sa.timeOutStarting)
+}
+
+// Clear the starting timer. If the application has progressed out of the starting state we need to stop the
+// timer and clean up.
+// Called when leaving the Starting state by the state machine.
+func (sa *Application) ClearStartingTimer() {
+	sa.stateTimer.Stop()
+	sa.stateTimer = nil
+}
+
+// In case of state aware scheduling we do not want to get stuck in starting as we might have an application that only
+// requires one allocation or is really slow asking for more than the first one.
+// This will progress the state of the application from Starting to Running
+func (sa *Application) timeOutStarting() {
+	// make sure we are still in the right state
+	// we could have been killed or something might have happened while waiting for a lock
+	if sa.IsStarting() {
+		log.Logger().Warn("Application in starting state timed out: auto progress",
+			zap.String("applicationID", sa.ApplicationID),
+			zap.String("state", sa.stateMachine.Current()))
+
+		//nolint: errcheck
+		_ = sa.HandleApplicationEvent(runApplication)
+	}
+}
+
+// Return an array of all reservation keys for the app.
+// This will return an empty array if there are no reservations.
+// Visible for tests
+func (sa *Application) GetReservations() []string {
+	sa.RLock()
+	defer sa.RUnlock()
+	keys := make([]string, 0)
+	for key := range sa.reservations {
+		keys = append(keys, key)
+	}
+	return keys
+}
+
+// Return the allocation ask for the key, nil if not found
+func (sa *Application) GetSchedulingAllocationAsk(allocationKey string) *AllocationAsk {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.requests[allocationKey]
+}
+
+// Return the allocated resources for this application
+func (sa *Application) GetAllocatedResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.allocatedResource.Clone()
+}
+
+// Return the pending resources for this application
+func (sa *Application) GetPendingResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.pending
+}
+
+// Remove one or more allocation asks from this application.
+// This also removes any reservations that are linked to the ask.
+// The return value is the number of reservations released
+func (sa *Application) RemoveAllocationAsk(allocKey string) int {
+	sa.Lock()
+	defer sa.Unlock()
+	// shortcut no need to do anything
+	if len(sa.requests) == 0 {
+		return 0
+	}
+	var deltaPendingResource *resources.Resource = nil
+	// when allocation key not specified, cleanup all allocation ask
+	var toRelease int
+	if allocKey == "" {
+		// cleanup all reservations
+		for key, reserve := range sa.reservations {
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing all allocation asks",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation (one at a time)
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		// Cleanup total pending resource
+		deltaPendingResource = sa.pending
+		sa.pending = resources.NewResource()
+		sa.requests = make(map[string]*AllocationAsk)
+	} else {
+		// cleanup the reservation for this allocation
+		for _, key := range sa.IsAskReserved(allocKey) {
+			reserve := sa.reservations[key]
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing allocation ask",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		if ask := sa.requests[allocKey]; ask != nil {
+			deltaPendingResource = resources.MultiplyBy(ask.AllocatedResource, float64(ask.GetPendingAskRepeat()))
+			sa.pending.SubFrom(deltaPendingResource)
+			delete(sa.requests, allocKey)
+		}
+	}
+	// clean up the queue pending resources
+	sa.queue.decPendingResource(deltaPendingResource)
+	// Check if we need to change state based on the ask removal:
+	// 1) if pending is zero (no more asks left)
+	// 2) if confirmed allocations is zero (nothing is running)
+	// Change the state to waiting.
+	// When the resource trackers are zero we should not expect anything to come in later.
+	if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) {
+		if err := sa.HandleApplicationEvent(waitApplication); err != nil {
+			log.Logger().Warn("Application state not changed to Waiting while updating ask(s)",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+
+	return toRelease
+}
+
+// Add an allocation ask to this application
+// If the ask already exist update the existing info
+func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return fmt.Errorf("ask cannot be nil when added to app %s", sa.ApplicationID)
+	}
+	if ask.GetPendingAskRepeat() == 0 || resources.IsZero(ask.AllocatedResource) {
+		return fmt.Errorf("invalid ask added to app %s: %v", sa.ApplicationID, ask)
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	delta := resources.Multiply(ask.AllocatedResource, int64(ask.GetPendingAskRepeat()))
+
+	var oldAskResource *resources.Resource = nil
+	if oldAsk := sa.requests[ask.AllocationKey]; oldAsk != nil {
+		oldAskResource = resources.Multiply(oldAsk.AllocatedResource, int64(oldAsk.GetPendingAskRepeat()))
+	}
+
+	// Check if we need to change state based on the ask added, there are two cases:
+	// 1) first ask added on a new app: state is New
+	// 2) all asks and allocation have been removed: state is Waiting
+	// Move the state and get it scheduling (again)
+	if sa.stateMachine.Is(New.String()) || sa.stateMachine.Is(Waiting.String()) {
+		if err := sa.HandleApplicationEvent(runApplication); err != nil {
+			log.Logger().Debug("Application state change failed while adding new ask",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+	sa.requests[ask.AllocationKey] = ask
+
+	// Update total pending resource
+	delta.SubFrom(oldAskResource)
+	sa.pending.AddTo(delta)
+	sa.queue.incPendingResource(delta)
+
+	return nil
+}
+
+// Add the ask when a node allocation is recovered. Maintaining the rule that an Allocation always has a
+// link to an AllocationAsk.
+// Safeguarded against a nil but the recovery generates the ask and should never be nil.
+func (sa *Application) RecoverAllocationAsk(ask *AllocationAsk) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	sa.requests[ask.AllocationKey] = ask
+}
+
+func (sa *Application) updateAskRepeat(allocKey string, delta int32) (*resources.Resource, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask := sa.requests[allocKey]; ask != nil {
+		return sa.updateAskRepeatInternal(ask, delta)
+	}
+	return nil, fmt.Errorf("failed to locate ask with key %s", allocKey)
+}
+
+func (sa *Application) updateAskRepeatInternal(ask *AllocationAsk, delta int32) (*resources.Resource, error) {
+	// updating with delta does error checking internally
+	if !ask.UpdatePendingAskRepeat(delta) {
+		return nil, fmt.Errorf("ask repaeat not updated resulting repeat less than zero for ask %s on app %s", ask.AllocationKey, sa.ApplicationID)
+	}
+
+	deltaPendingResource := resources.Multiply(ask.AllocatedResource, int64(delta))
+	sa.pending.AddTo(deltaPendingResource)
+	// update the pending of the queue with the same delta
+	sa.queue.incPendingResource(deltaPendingResource)
+
+	return deltaPendingResource, nil
+}
+
+// Return if the application has any reservations.
+func (sa *Application) hasReserved() bool {
+	sa.RLock()
+	defer sa.RUnlock()
+	return len(sa.reservations) > 0
+}
+
+// Return if the application has the node reserved.
+// An empty nodeID is never reserved.
+func (sa *Application) IsReservedOnNode(nodeID string) bool {
+	if nodeID == "" {
+		return false
+	}
+	sa.RLock()
+	defer sa.RUnlock()
+	for key := range sa.reservations {
+		if strings.HasPrefix(key, nodeID) {
+			return true
+		}
+	}
+	return false
+}
+
+// Reserve the application for this node and ask combination.
+// If the reservation fails the function returns false, if the reservation is made it returns true.
+// If the node and ask combination was already reserved for the application this is a noop and returns true.
+func (sa *Application) Reserve(node *Node, ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	// create the reservation (includes nil checks)
+	nodeReservation := newReservation(node, sa, ask, true)
+	if nodeReservation == nil {
+		log.Logger().Debug("reservation creation failed unexpectedly",
+			zap.String("app", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return fmt.Errorf("reservation creation failed node or ask are nil on appID %s", sa.ApplicationID)
+	}
+	allocKey := ask.AllocationKey
+	if sa.requests[allocKey] == nil {
+		log.Logger().Debug("ask is not registered to this app",
+			zap.String("app", sa.ApplicationID),
+			zap.String("allocKey", allocKey))
+		return fmt.Errorf("reservation creation failed ask %s not found on appID %s", allocKey, sa.ApplicationID)
+	}
+	if !sa.canAskReserve(ask) {
+		return fmt.Errorf("reservation of ask exceeds pending repeat, pending ask repeat %d", ask.GetPendingAskRepeat())
+	}
+	// check if we can reserve the node before reserving on the app
+	if err := node.Reserve(sa, ask); err != nil {
+		return err
+	}
+	sa.reservations[nodeReservation.getKey()] = nodeReservation
+	log.Logger().Info("reservation added successfully", zap.String("node", node.NodeID),
+		zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+	return nil
+}
+
+// UnReserve the application for this node and ask combination.
+// This first removes the reservation from the node.
+// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
+// The error is set if the reservation key cannot be removed from the app or node.
+func (sa *Application) UnReserve(node *Node, ask *AllocationAsk) (int, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	return sa.unReserveInternal(node, ask)
+}
+
+// Unlocked version for UnReserve that really does the work.
+// Must only be called while holding the application lock.
+func (sa *Application) unReserveInternal(node *Node, ask *AllocationAsk) (int, error) {
+	resKey := reservationKey(node, nil, ask)
+	if resKey == "" {
+		log.Logger().Debug("unreserve reservation key create failed unexpectedly",
+			zap.String("appID", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return 0, fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationID)
+	}
+	// unReserve the node before removing from the app
+	var num int
+	var err error
+	if num, err = node.unReserve(sa, ask); err != nil {
+		return 0, err
+	}
+	// if the unreserve worked on the node check the app
+	if _, found := sa.reservations[resKey]; found {
+		// worked on the node means either found or not but no error, log difference here
+		if num == 0 {
+			log.Logger().Info("reservation not found while removing from node, app has reservation",
+				zap.String("appID", sa.ApplicationID),
+				zap.String("nodeID", node.NodeID),
+				zap.String("ask", ask.AllocationKey))
+		}
+		delete(sa.reservations, resKey)
+		log.Logger().Info("reservation removed successfully", zap.String("node", node.NodeID),
+			zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+		return 1, nil
+	}
+	// reservation was not found
+	log.Logger().Info("reservation not found while removing from app",
+		zap.String("appID", sa.ApplicationID),
+		zap.String("nodeID", node.NodeID),
+		zap.String("ask", ask.AllocationKey),
+		zap.Int("nodeReservationsRemoved", num))
+	return 0, nil
+}
+
+// Return the allocation reservations on any node.
+// The returned array is 0 or more keys into the reservations map.
+// No locking must be called while holding the lock
+func (sa *Application) IsAskReserved(allocKey string) []string {
+	reservationKeys := make([]string, 0)
+	if allocKey == "" {
+		return reservationKeys
+	}
+	for key := range sa.reservations {
+		if strings.HasSuffix(key, allocKey) {
+			reservationKeys = append(reservationKeys, key)
+		}
+	}
+	return reservationKeys
+}
+
+// Check if the allocation has already been reserved. An ask can reserve multiple nodes if the request has a repeat set
+// larger than 1. It can never reserve more than the repeat number of nodes.
+// No locking must be called while holding the lock
+func (sa *Application) canAskReserve(ask *AllocationAsk) bool {
+	allocKey := ask.AllocationKey
+	pending := int(ask.GetPendingAskRepeat())
+	resNumber := sa.IsAskReserved(allocKey)
+	if len(resNumber) >= pending {
+		log.Logger().Debug("reservation exceeds repeats",
+			zap.String("askKey", allocKey),
+			zap.Int("askPending", pending),
+			zap.Int("askReserved", len(resNumber)))
+	}
+	return pending > len(resNumber)
+}
+
+// Sort the request for the app in order based on the priority of the request.
+// The sorted list only contains candidates that have an outstanding repeat.
+// No locking must be called while holding the lock
+func (sa *Application) sortRequests(ascending bool) {
+	sa.sortedRequests = nil
+	for _, request := range sa.requests {
+		if request.GetPendingAskRepeat() == 0 {
+			continue
+		}
+		sa.sortedRequests = append(sa.sortedRequests, request)
+	}
+	// we might not have any requests
+	if len(sa.sortedRequests) > 0 {
+		sortAskByPriority(sa.sortedRequests, ascending)
+	}
+}
+
+func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, total *[]*AllocationAsk) {
+	sa.RLock()
+	defer sa.RUnlock()
+
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	for _, request := range sa.sortedRequests {
+		if headRoom == nil || resources.FitIn(headRoom, request.AllocatedResource) {
+			// if headroom is still enough for the resources
+			*total = append(*total, request)
+			if headRoom != nil {
+				headRoom.SubFrom(request.AllocatedResource)
+			}
+		}
+	}
+}
+
+// Try a regular allocation of the pending requests
+func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	// get all the requests from the app sorted in order
+	for _, request := range sa.sortedRequests {
+		// resource must fit in headroom otherwise skip the request
+		if !resources.FitIn(headRoom, request.AllocatedResource) {
+			// post scheduling events via the event plugin
+			if eventCache := events.GetEventCache(); eventCache != nil {
+				message := fmt.Sprintf("Application %s does not fit into %s queue", request.ApplicationID, sa.QueueName)
+				if event, err := events.CreateRequestEventRecord(request.AllocationKey, request.ApplicationID, "InsufficientQueueResources", message); err != nil {
+					log.Logger().Warn("Event creation failed",
+						zap.String("event message", message),
+						zap.Error(err))
+				} else {
+					eventCache.AddEvent(event)
+				}
+			}
+			continue
+		}
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodes(request, iterator)
+			// have a candidate return it
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	// no requests fit, skip to next app
+	return nil
+}
+
+// Try a reserved allocation of an outstanding reservation
+func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// process all outstanding reservations and pick the first one that fits
+	for _, reserve := range sa.reservations {
+		ask := sa.requests[reserve.askKey]
+		// sanity check and cleanup if needed
+		if ask == nil || ask.GetPendingAskRepeat() == 0 {
+			var unreserveAsk *AllocationAsk
+			// if the ask was not found we need to construct one to unreserve
+			if ask == nil {
+				unreserveAsk = &AllocationAsk{
+					AllocationKey: reserve.askKey,
+					ApplicationID: sa.ApplicationID,
+				}
+			} else {
+				unreserveAsk = ask
+			}
+			// remove the reservation as this should not be reserved
+			alloc := newReservedAllocation(Unreserved, reserve.nodeID, unreserveAsk)
+			return alloc
+		}
+		// check if this fits in the queue's head room
+		if !resources.FitIn(headRoom, ask.AllocatedResource) {
+			continue
+		}
+		// check allocation possibility
+		alloc := sa.tryNode(reserve.node, ask)
+		// allocation worked fix the result and return
+		if alloc != nil {
+			alloc.Result = AllocatedReserved
+			return alloc
+		}
+	}
+	// lets try this on all other nodes
+	for _, reserve := range sa.reservations {
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodesNoReserve(reserve.ask, iterator, reserve.nodeID)
+			// have a candidate return it, including the node that was reserved
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	return nil
+}
+
+// Try all the nodes for a reserved request that have not been tried yet.
+// This should never result in a reservation as the ask is already reserved
+func (sa *Application) tryNodesNoReserve(ask *AllocationAsk, iterator interfaces.NodeIterator, reservedNode string) *Allocation {
+	for iterator.HasNext() {
+		node, ok := iterator.Next().(*Node)
+		if !ok {
+			log.Logger().Debug("Node iterator failed to return a node")

Review comment:
       ERROR level might be more appropriate for this

##########
File path: pkg/scheduler/objects/application.go
##########
@@ -0,0 +1,926 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"fmt"
+	"math"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/looplab/fsm"
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/security"
+	"github.com/apache/incubator-yunikorn-core/pkg/events"
+	"github.com/apache/incubator-yunikorn-core/pkg/handler"
+	"github.com/apache/incubator-yunikorn-core/pkg/interfaces"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	reservationDelay = 2 * time.Second
+	startingTimeout  = time.Minute * 5
+)
+
+type Application struct {
+	ApplicationID  string
+	Partition      string
+	QueueName      string
+	SubmissionTime time.Time
+
+	// Private fields need protection
+	queue             *Queue                    // queue the application is running in
+	pending           *resources.Resource       // pending resources from asks for the app
+	reservations      map[string]*reservation   // a map of reservations
+	requests          map[string]*AllocationAsk // a map of asks
+	sortedRequests    []*AllocationAsk
+	user              security.UserGroup     // owner of the application
+	tags              map[string]string      // application tags used in scheduling
+	allocatedResource *resources.Resource    // total allocated resources
+	allocations       map[string]*Allocation // list of all allocations
+	stateMachine      *fsm.FSM               // application state machine
+	stateTimer        *time.Timer            // timer for state time
+
+	rmEventHandler handler.EventHandler
+	rmID           string
+
+	sync.RWMutex
+}
+
+func newBlankApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string) *Application {
+	return &Application{
+		ApplicationID:     appID,
+		Partition:         partition,
+		QueueName:         queueName,
+		SubmissionTime:    time.Now(),
+		user:              ugi,
+		tags:              tags,
+		pending:           resources.NewResource(),
+		allocatedResource: resources.NewResource(),
+		requests:          make(map[string]*AllocationAsk),
+		reservations:      make(map[string]*reservation),
+		allocations:       make(map[string]*Allocation),
+		stateMachine:      NewAppState(),
+	}
+}
+
+func NewApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string, eventHandler handler.EventHandler, rmID string) *Application {
+	app := newBlankApplication(appID, partition, queueName, ugi, tags)
+	app.rmEventHandler = eventHandler
+	app.rmID = rmID
+	return app
+}
+
+func (sa *Application) String() string {
+	if sa == nil {
+		return "application is nil"
+	}
+	return fmt.Sprintf("ApplicationID: %s, Partition: %s, QueueName: %s, SubmissionTime: %x",
+		sa.ApplicationID, sa.Partition, sa.QueueName, sa.SubmissionTime)
+}
+
+// Set the reservation delay.
+// Set when the cluster context is created to disable reservation.
+func SetReservationDelay(delay time.Duration) {
+	log.Logger().Debug("Set reservation delay",
+		zap.Duration("delay", delay))
+	reservationDelay = delay
+}
+
+// Return the current state or a checked specific state for the application.
+// The state machine handles the locking.
+func (sa *Application) CurrentState() string {
+	return sa.stateMachine.Current()
+}
+
+func (sa *Application) IsStarting() bool {
+	return sa.stateMachine.Is(Starting.String())
+}
+
+func (sa *Application) IsAccepted() bool {
+	return sa.stateMachine.Is(Accepted.String())
+}
+
+func (sa *Application) IsNew() bool {
+	return sa.stateMachine.Is(New.String())
+}
+
+func (sa *Application) IsRunning() bool {
+	return sa.stateMachine.Is(Running.String())
+}
+
+func (sa *Application) IsWaiting() bool {
+	return sa.stateMachine.Is(Waiting.String())
+}
+
+// Handle the state event for the application.
+// The state machine handles the locking.
+func (sa *Application) HandleApplicationEvent(event applicationEvent) error {
+	err := sa.stateMachine.Event(event.String(), sa)
+	// handle the same state transition not nil error (limit of fsm).
+	if err != nil && err.Error() == noTransition {
+		return nil
+	}
+	return err
+}
+
+func (sa *Application) OnStateChange(event *fsm.Event) {
+	updatedApps := make([]*si.UpdatedApplication, 0)
+	updatedApps = append(updatedApps, &si.UpdatedApplication{
+		ApplicationID:            sa.ApplicationID,
+		State:                    sa.stateMachine.Current(),
+		StateTransitionTimestamp: time.Now().UnixNano(),
+		Message:                  fmt.Sprintf("{Status change triggered by the event : %v}", event),
+	})
+
+	if sa.rmEventHandler != nil {
+		sa.rmEventHandler.HandleEvent(
+			&rmevent.RMApplicationUpdateEvent{
+				RmID:                 sa.rmID,
+				AcceptedApplications: make([]*si.AcceptedApplication, 0),
+				RejectedApplications: make([]*si.RejectedApplication, 0),
+				UpdatedApplications:  updatedApps,
+			})
+	}
+}
+
+// Set the starting timer to make sure the application will not get stuck in a starting state too long.
+// This prevents an app from not progressing to Running when it only has 1 allocation.
+// Called when entering the Starting state by the state machine.
+func (sa *Application) SetStartingTimer() {
+	log.Logger().Debug("Application Starting state timer initiated",
+		zap.String("appID", sa.ApplicationID),
+		zap.Duration("timeout", startingTimeout))
+	sa.stateTimer = time.AfterFunc(startingTimeout, sa.timeOutStarting)
+}
+
+// Clear the starting timer. If the application has progressed out of the starting state we need to stop the
+// timer and clean up.
+// Called when leaving the Starting state by the state machine.
+func (sa *Application) ClearStartingTimer() {
+	sa.stateTimer.Stop()
+	sa.stateTimer = nil
+}
+
+// In case of state aware scheduling we do not want to get stuck in starting as we might have an application that only
+// requires one allocation or is really slow asking for more than the first one.
+// This will progress the state of the application from Starting to Running
+func (sa *Application) timeOutStarting() {
+	// make sure we are still in the right state
+	// we could have been killed or something might have happened while waiting for a lock
+	if sa.IsStarting() {
+		log.Logger().Warn("Application in starting state timed out: auto progress",
+			zap.String("applicationID", sa.ApplicationID),
+			zap.String("state", sa.stateMachine.Current()))
+
+		//nolint: errcheck
+		_ = sa.HandleApplicationEvent(runApplication)
+	}
+}
+
+// Return an array of all reservation keys for the app.
+// This will return an empty array if there are no reservations.
+// Visible for tests
+func (sa *Application) GetReservations() []string {
+	sa.RLock()
+	defer sa.RUnlock()
+	keys := make([]string, 0)
+	for key := range sa.reservations {
+		keys = append(keys, key)
+	}
+	return keys
+}
+
+// Return the allocation ask for the key, nil if not found
+func (sa *Application) GetSchedulingAllocationAsk(allocationKey string) *AllocationAsk {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.requests[allocationKey]
+}
+
+// Return the allocated resources for this application
+func (sa *Application) GetAllocatedResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.allocatedResource.Clone()
+}
+
+// Return the pending resources for this application
+func (sa *Application) GetPendingResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.pending
+}
+
+// Remove one or more allocation asks from this application.
+// This also removes any reservations that are linked to the ask.
+// The return value is the number of reservations released
+func (sa *Application) RemoveAllocationAsk(allocKey string) int {
+	sa.Lock()
+	defer sa.Unlock()
+	// shortcut no need to do anything
+	if len(sa.requests) == 0 {
+		return 0
+	}
+	var deltaPendingResource *resources.Resource = nil
+	// when allocation key not specified, cleanup all allocation ask
+	var toRelease int
+	if allocKey == "" {
+		// cleanup all reservations
+		for key, reserve := range sa.reservations {
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing all allocation asks",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation (one at a time)
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		// Cleanup total pending resource
+		deltaPendingResource = sa.pending
+		sa.pending = resources.NewResource()
+		sa.requests = make(map[string]*AllocationAsk)
+	} else {
+		// cleanup the reservation for this allocation
+		for _, key := range sa.IsAskReserved(allocKey) {
+			reserve := sa.reservations[key]
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing allocation ask",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		if ask := sa.requests[allocKey]; ask != nil {
+			deltaPendingResource = resources.MultiplyBy(ask.AllocatedResource, float64(ask.GetPendingAskRepeat()))
+			sa.pending.SubFrom(deltaPendingResource)
+			delete(sa.requests, allocKey)
+		}
+	}
+	// clean up the queue pending resources
+	sa.queue.decPendingResource(deltaPendingResource)
+	// Check if we need to change state based on the ask removal:
+	// 1) if pending is zero (no more asks left)
+	// 2) if confirmed allocations is zero (nothing is running)
+	// Change the state to waiting.
+	// When the resource trackers are zero we should not expect anything to come in later.
+	if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) {
+		if err := sa.HandleApplicationEvent(waitApplication); err != nil {
+			log.Logger().Warn("Application state not changed to Waiting while updating ask(s)",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+
+	return toRelease
+}
+
+// Add an allocation ask to this application
+// If the ask already exist update the existing info
+func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return fmt.Errorf("ask cannot be nil when added to app %s", sa.ApplicationID)
+	}
+	if ask.GetPendingAskRepeat() == 0 || resources.IsZero(ask.AllocatedResource) {
+		return fmt.Errorf("invalid ask added to app %s: %v", sa.ApplicationID, ask)
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	delta := resources.Multiply(ask.AllocatedResource, int64(ask.GetPendingAskRepeat()))
+
+	var oldAskResource *resources.Resource = nil
+	if oldAsk := sa.requests[ask.AllocationKey]; oldAsk != nil {
+		oldAskResource = resources.Multiply(oldAsk.AllocatedResource, int64(oldAsk.GetPendingAskRepeat()))
+	}
+
+	// Check if we need to change state based on the ask added, there are two cases:
+	// 1) first ask added on a new app: state is New
+	// 2) all asks and allocation have been removed: state is Waiting
+	// Move the state and get it scheduling (again)
+	if sa.stateMachine.Is(New.String()) || sa.stateMachine.Is(Waiting.String()) {
+		if err := sa.HandleApplicationEvent(runApplication); err != nil {
+			log.Logger().Debug("Application state change failed while adding new ask",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+	sa.requests[ask.AllocationKey] = ask
+
+	// Update total pending resource
+	delta.SubFrom(oldAskResource)
+	sa.pending.AddTo(delta)
+	sa.queue.incPendingResource(delta)
+
+	return nil
+}
+
+// Add the ask when a node allocation is recovered. Maintaining the rule that an Allocation always has a
+// link to an AllocationAsk.
+// Safeguarded against a nil but the recovery generates the ask and should never be nil.
+func (sa *Application) RecoverAllocationAsk(ask *AllocationAsk) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	sa.requests[ask.AllocationKey] = ask
+}
+
+func (sa *Application) updateAskRepeat(allocKey string, delta int32) (*resources.Resource, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask := sa.requests[allocKey]; ask != nil {
+		return sa.updateAskRepeatInternal(ask, delta)
+	}
+	return nil, fmt.Errorf("failed to locate ask with key %s", allocKey)
+}
+
+func (sa *Application) updateAskRepeatInternal(ask *AllocationAsk, delta int32) (*resources.Resource, error) {
+	// updating with delta does error checking internally
+	if !ask.UpdatePendingAskRepeat(delta) {
+		return nil, fmt.Errorf("ask repaeat not updated resulting repeat less than zero for ask %s on app %s", ask.AllocationKey, sa.ApplicationID)
+	}
+
+	deltaPendingResource := resources.Multiply(ask.AllocatedResource, int64(delta))
+	sa.pending.AddTo(deltaPendingResource)
+	// update the pending of the queue with the same delta
+	sa.queue.incPendingResource(deltaPendingResource)
+
+	return deltaPendingResource, nil
+}
+
+// Return if the application has any reservations.
+func (sa *Application) hasReserved() bool {
+	sa.RLock()
+	defer sa.RUnlock()
+	return len(sa.reservations) > 0
+}
+
+// Return if the application has the node reserved.
+// An empty nodeID is never reserved.
+func (sa *Application) IsReservedOnNode(nodeID string) bool {
+	if nodeID == "" {
+		return false
+	}
+	sa.RLock()
+	defer sa.RUnlock()
+	for key := range sa.reservations {
+		if strings.HasPrefix(key, nodeID) {
+			return true
+		}
+	}
+	return false
+}
+
+// Reserve the application for this node and ask combination.
+// If the reservation fails the function returns false, if the reservation is made it returns true.
+// If the node and ask combination was already reserved for the application this is a noop and returns true.
+func (sa *Application) Reserve(node *Node, ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	// create the reservation (includes nil checks)
+	nodeReservation := newReservation(node, sa, ask, true)
+	if nodeReservation == nil {
+		log.Logger().Debug("reservation creation failed unexpectedly",
+			zap.String("app", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return fmt.Errorf("reservation creation failed node or ask are nil on appID %s", sa.ApplicationID)
+	}
+	allocKey := ask.AllocationKey
+	if sa.requests[allocKey] == nil {
+		log.Logger().Debug("ask is not registered to this app",
+			zap.String("app", sa.ApplicationID),
+			zap.String("allocKey", allocKey))
+		return fmt.Errorf("reservation creation failed ask %s not found on appID %s", allocKey, sa.ApplicationID)
+	}
+	if !sa.canAskReserve(ask) {
+		return fmt.Errorf("reservation of ask exceeds pending repeat, pending ask repeat %d", ask.GetPendingAskRepeat())
+	}
+	// check if we can reserve the node before reserving on the app
+	if err := node.Reserve(sa, ask); err != nil {
+		return err
+	}
+	sa.reservations[nodeReservation.getKey()] = nodeReservation
+	log.Logger().Info("reservation added successfully", zap.String("node", node.NodeID),
+		zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+	return nil
+}
+
+// UnReserve the application for this node and ask combination.
+// This first removes the reservation from the node.
+// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
+// The error is set if the reservation key cannot be removed from the app or node.
+func (sa *Application) UnReserve(node *Node, ask *AllocationAsk) (int, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	return sa.unReserveInternal(node, ask)
+}
+
+// Unlocked version for UnReserve that really does the work.
+// Must only be called while holding the application lock.
+func (sa *Application) unReserveInternal(node *Node, ask *AllocationAsk) (int, error) {
+	resKey := reservationKey(node, nil, ask)
+	if resKey == "" {
+		log.Logger().Debug("unreserve reservation key create failed unexpectedly",
+			zap.String("appID", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return 0, fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationID)
+	}
+	// unReserve the node before removing from the app
+	var num int
+	var err error
+	if num, err = node.unReserve(sa, ask); err != nil {
+		return 0, err
+	}
+	// if the unreserve worked on the node check the app
+	if _, found := sa.reservations[resKey]; found {
+		// worked on the node means either found or not but no error, log difference here
+		if num == 0 {
+			log.Logger().Info("reservation not found while removing from node, app has reservation",
+				zap.String("appID", sa.ApplicationID),
+				zap.String("nodeID", node.NodeID),
+				zap.String("ask", ask.AllocationKey))
+		}
+		delete(sa.reservations, resKey)
+		log.Logger().Info("reservation removed successfully", zap.String("node", node.NodeID),
+			zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+		return 1, nil
+	}
+	// reservation was not found
+	log.Logger().Info("reservation not found while removing from app",
+		zap.String("appID", sa.ApplicationID),
+		zap.String("nodeID", node.NodeID),
+		zap.String("ask", ask.AllocationKey),
+		zap.Int("nodeReservationsRemoved", num))
+	return 0, nil
+}
+
+// Return the allocation reservations on any node.
+// The returned array is 0 or more keys into the reservations map.
+// No locking must be called while holding the lock
+func (sa *Application) IsAskReserved(allocKey string) []string {
+	reservationKeys := make([]string, 0)
+	if allocKey == "" {
+		return reservationKeys
+	}
+	for key := range sa.reservations {
+		if strings.HasSuffix(key, allocKey) {
+			reservationKeys = append(reservationKeys, key)
+		}
+	}
+	return reservationKeys
+}
+
+// Check if the allocation has already been reserved. An ask can reserve multiple nodes if the request has a repeat set
+// larger than 1. It can never reserve more than the repeat number of nodes.
+// No locking must be called while holding the lock
+func (sa *Application) canAskReserve(ask *AllocationAsk) bool {
+	allocKey := ask.AllocationKey
+	pending := int(ask.GetPendingAskRepeat())
+	resNumber := sa.IsAskReserved(allocKey)
+	if len(resNumber) >= pending {
+		log.Logger().Debug("reservation exceeds repeats",
+			zap.String("askKey", allocKey),
+			zap.Int("askPending", pending),
+			zap.Int("askReserved", len(resNumber)))
+	}
+	return pending > len(resNumber)
+}
+
+// Sort the request for the app in order based on the priority of the request.
+// The sorted list only contains candidates that have an outstanding repeat.
+// No locking must be called while holding the lock
+func (sa *Application) sortRequests(ascending bool) {
+	sa.sortedRequests = nil
+	for _, request := range sa.requests {
+		if request.GetPendingAskRepeat() == 0 {
+			continue
+		}
+		sa.sortedRequests = append(sa.sortedRequests, request)
+	}
+	// we might not have any requests
+	if len(sa.sortedRequests) > 0 {
+		sortAskByPriority(sa.sortedRequests, ascending)
+	}
+}
+
+func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, total *[]*AllocationAsk) {
+	sa.RLock()
+	defer sa.RUnlock()
+
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	for _, request := range sa.sortedRequests {
+		if headRoom == nil || resources.FitIn(headRoom, request.AllocatedResource) {
+			// if headroom is still enough for the resources
+			*total = append(*total, request)
+			if headRoom != nil {
+				headRoom.SubFrom(request.AllocatedResource)
+			}
+		}
+	}
+}
+
+// Try a regular allocation of the pending requests
+func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	// get all the requests from the app sorted in order
+	for _, request := range sa.sortedRequests {
+		// resource must fit in headroom otherwise skip the request
+		if !resources.FitIn(headRoom, request.AllocatedResource) {
+			// post scheduling events via the event plugin
+			if eventCache := events.GetEventCache(); eventCache != nil {
+				message := fmt.Sprintf("Application %s does not fit into %s queue", request.ApplicationID, sa.QueueName)
+				if event, err := events.CreateRequestEventRecord(request.AllocationKey, request.ApplicationID, "InsufficientQueueResources", message); err != nil {
+					log.Logger().Warn("Event creation failed",
+						zap.String("event message", message),
+						zap.Error(err))
+				} else {
+					eventCache.AddEvent(event)
+				}
+			}
+			continue
+		}
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodes(request, iterator)
+			// have a candidate return it
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	// no requests fit, skip to next app
+	return nil
+}
+
+// Try a reserved allocation of an outstanding reservation
+func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// process all outstanding reservations and pick the first one that fits
+	for _, reserve := range sa.reservations {
+		ask := sa.requests[reserve.askKey]
+		// sanity check and cleanup if needed
+		if ask == nil || ask.GetPendingAskRepeat() == 0 {
+			var unreserveAsk *AllocationAsk
+			// if the ask was not found we need to construct one to unreserve
+			if ask == nil {
+				unreserveAsk = &AllocationAsk{
+					AllocationKey: reserve.askKey,
+					ApplicationID: sa.ApplicationID,
+				}
+			} else {
+				unreserveAsk = ask
+			}
+			// remove the reservation as this should not be reserved
+			alloc := newReservedAllocation(Unreserved, reserve.nodeID, unreserveAsk)
+			return alloc
+		}
+		// check if this fits in the queue's head room
+		if !resources.FitIn(headRoom, ask.AllocatedResource) {
+			continue
+		}
+		// check allocation possibility
+		alloc := sa.tryNode(reserve.node, ask)
+		// allocation worked fix the result and return
+		if alloc != nil {
+			alloc.Result = AllocatedReserved
+			return alloc
+		}
+	}
+	// lets try this on all other nodes
+	for _, reserve := range sa.reservations {
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodesNoReserve(reserve.ask, iterator, reserve.nodeID)
+			// have a candidate return it, including the node that was reserved
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	return nil
+}
+
+// Try all the nodes for a reserved request that have not been tried yet.
+// This should never result in a reservation as the ask is already reserved
+func (sa *Application) tryNodesNoReserve(ask *AllocationAsk, iterator interfaces.NodeIterator, reservedNode string) *Allocation {
+	for iterator.HasNext() {
+		node, ok := iterator.Next().(*Node)

Review comment:
       are we safe here when `ok` is `false`?
   looks like it does a cast even node could be nil

##########
File path: pkg/scheduler/objects/application.go
##########
@@ -0,0 +1,926 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+	"fmt"
+	"math"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/looplab/fsm"
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/security"
+	"github.com/apache/incubator-yunikorn-core/pkg/events"
+	"github.com/apache/incubator-yunikorn-core/pkg/handler"
+	"github.com/apache/incubator-yunikorn-core/pkg/interfaces"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/rmproxy/rmevent"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+var (
+	reservationDelay = 2 * time.Second
+	startingTimeout  = time.Minute * 5
+)
+
+type Application struct {
+	ApplicationID  string
+	Partition      string
+	QueueName      string
+	SubmissionTime time.Time
+
+	// Private fields need protection
+	queue             *Queue                    // queue the application is running in
+	pending           *resources.Resource       // pending resources from asks for the app
+	reservations      map[string]*reservation   // a map of reservations
+	requests          map[string]*AllocationAsk // a map of asks
+	sortedRequests    []*AllocationAsk
+	user              security.UserGroup     // owner of the application
+	tags              map[string]string      // application tags used in scheduling
+	allocatedResource *resources.Resource    // total allocated resources
+	allocations       map[string]*Allocation // list of all allocations
+	stateMachine      *fsm.FSM               // application state machine
+	stateTimer        *time.Timer            // timer for state time
+
+	rmEventHandler handler.EventHandler
+	rmID           string
+
+	sync.RWMutex
+}
+
+func newBlankApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string) *Application {
+	return &Application{
+		ApplicationID:     appID,
+		Partition:         partition,
+		QueueName:         queueName,
+		SubmissionTime:    time.Now(),
+		user:              ugi,
+		tags:              tags,
+		pending:           resources.NewResource(),
+		allocatedResource: resources.NewResource(),
+		requests:          make(map[string]*AllocationAsk),
+		reservations:      make(map[string]*reservation),
+		allocations:       make(map[string]*Allocation),
+		stateMachine:      NewAppState(),
+	}
+}
+
+func NewApplication(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string, eventHandler handler.EventHandler, rmID string) *Application {
+	app := newBlankApplication(appID, partition, queueName, ugi, tags)
+	app.rmEventHandler = eventHandler
+	app.rmID = rmID
+	return app
+}
+
+func (sa *Application) String() string {
+	if sa == nil {
+		return "application is nil"
+	}
+	return fmt.Sprintf("ApplicationID: %s, Partition: %s, QueueName: %s, SubmissionTime: %x",
+		sa.ApplicationID, sa.Partition, sa.QueueName, sa.SubmissionTime)
+}
+
+// Set the reservation delay.
+// Set when the cluster context is created to disable reservation.
+func SetReservationDelay(delay time.Duration) {
+	log.Logger().Debug("Set reservation delay",
+		zap.Duration("delay", delay))
+	reservationDelay = delay
+}
+
+// Return the current state or a checked specific state for the application.
+// The state machine handles the locking.
+func (sa *Application) CurrentState() string {
+	return sa.stateMachine.Current()
+}
+
+func (sa *Application) IsStarting() bool {
+	return sa.stateMachine.Is(Starting.String())
+}
+
+func (sa *Application) IsAccepted() bool {
+	return sa.stateMachine.Is(Accepted.String())
+}
+
+func (sa *Application) IsNew() bool {
+	return sa.stateMachine.Is(New.String())
+}
+
+func (sa *Application) IsRunning() bool {
+	return sa.stateMachine.Is(Running.String())
+}
+
+func (sa *Application) IsWaiting() bool {
+	return sa.stateMachine.Is(Waiting.String())
+}
+
+// Handle the state event for the application.
+// The state machine handles the locking.
+func (sa *Application) HandleApplicationEvent(event applicationEvent) error {
+	err := sa.stateMachine.Event(event.String(), sa)
+	// handle the same state transition not nil error (limit of fsm).
+	if err != nil && err.Error() == noTransition {
+		return nil
+	}
+	return err
+}
+
+func (sa *Application) OnStateChange(event *fsm.Event) {
+	updatedApps := make([]*si.UpdatedApplication, 0)
+	updatedApps = append(updatedApps, &si.UpdatedApplication{
+		ApplicationID:            sa.ApplicationID,
+		State:                    sa.stateMachine.Current(),
+		StateTransitionTimestamp: time.Now().UnixNano(),
+		Message:                  fmt.Sprintf("{Status change triggered by the event : %v}", event),
+	})
+
+	if sa.rmEventHandler != nil {
+		sa.rmEventHandler.HandleEvent(
+			&rmevent.RMApplicationUpdateEvent{
+				RmID:                 sa.rmID,
+				AcceptedApplications: make([]*si.AcceptedApplication, 0),
+				RejectedApplications: make([]*si.RejectedApplication, 0),
+				UpdatedApplications:  updatedApps,
+			})
+	}
+}
+
+// Set the starting timer to make sure the application will not get stuck in a starting state too long.
+// This prevents an app from not progressing to Running when it only has 1 allocation.
+// Called when entering the Starting state by the state machine.
+func (sa *Application) SetStartingTimer() {
+	log.Logger().Debug("Application Starting state timer initiated",
+		zap.String("appID", sa.ApplicationID),
+		zap.Duration("timeout", startingTimeout))
+	sa.stateTimer = time.AfterFunc(startingTimeout, sa.timeOutStarting)
+}
+
+// Clear the starting timer. If the application has progressed out of the starting state we need to stop the
+// timer and clean up.
+// Called when leaving the Starting state by the state machine.
+func (sa *Application) ClearStartingTimer() {
+	sa.stateTimer.Stop()
+	sa.stateTimer = nil
+}
+
+// In case of state aware scheduling we do not want to get stuck in starting as we might have an application that only
+// requires one allocation or is really slow asking for more than the first one.
+// This will progress the state of the application from Starting to Running
+func (sa *Application) timeOutStarting() {
+	// make sure we are still in the right state
+	// we could have been killed or something might have happened while waiting for a lock
+	if sa.IsStarting() {
+		log.Logger().Warn("Application in starting state timed out: auto progress",
+			zap.String("applicationID", sa.ApplicationID),
+			zap.String("state", sa.stateMachine.Current()))
+
+		//nolint: errcheck
+		_ = sa.HandleApplicationEvent(runApplication)
+	}
+}
+
+// Return an array of all reservation keys for the app.
+// This will return an empty array if there are no reservations.
+// Visible for tests
+func (sa *Application) GetReservations() []string {
+	sa.RLock()
+	defer sa.RUnlock()
+	keys := make([]string, 0)
+	for key := range sa.reservations {
+		keys = append(keys, key)
+	}
+	return keys
+}
+
+// Return the allocation ask for the key, nil if not found
+func (sa *Application) GetSchedulingAllocationAsk(allocationKey string) *AllocationAsk {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.requests[allocationKey]
+}
+
+// Return the allocated resources for this application
+func (sa *Application) GetAllocatedResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.allocatedResource.Clone()
+}
+
+// Return the pending resources for this application
+func (sa *Application) GetPendingResource() *resources.Resource {
+	sa.RLock()
+	defer sa.RUnlock()
+	return sa.pending
+}
+
+// Remove one or more allocation asks from this application.
+// This also removes any reservations that are linked to the ask.
+// The return value is the number of reservations released
+func (sa *Application) RemoveAllocationAsk(allocKey string) int {
+	sa.Lock()
+	defer sa.Unlock()
+	// shortcut no need to do anything
+	if len(sa.requests) == 0 {
+		return 0
+	}
+	var deltaPendingResource *resources.Resource = nil
+	// when allocation key not specified, cleanup all allocation ask
+	var toRelease int
+	if allocKey == "" {
+		// cleanup all reservations
+		for key, reserve := range sa.reservations {
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing all allocation asks",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation (one at a time)
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		// Cleanup total pending resource
+		deltaPendingResource = sa.pending
+		sa.pending = resources.NewResource()
+		sa.requests = make(map[string]*AllocationAsk)
+	} else {
+		// cleanup the reservation for this allocation
+		for _, key := range sa.IsAskReserved(allocKey) {
+			reserve := sa.reservations[key]
+			releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
+			if err != nil {
+				log.Logger().Warn("Removal of reservation failed while removing allocation ask",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("reservationKey", key),
+					zap.Error(err))
+				continue
+			}
+			// clean up the queue reservation
+			sa.queue.UnReserve(sa.ApplicationID, releases)
+			toRelease += releases
+		}
+		if ask := sa.requests[allocKey]; ask != nil {
+			deltaPendingResource = resources.MultiplyBy(ask.AllocatedResource, float64(ask.GetPendingAskRepeat()))
+			sa.pending.SubFrom(deltaPendingResource)
+			delete(sa.requests, allocKey)
+		}
+	}
+	// clean up the queue pending resources
+	sa.queue.decPendingResource(deltaPendingResource)
+	// Check if we need to change state based on the ask removal:
+	// 1) if pending is zero (no more asks left)
+	// 2) if confirmed allocations is zero (nothing is running)
+	// Change the state to waiting.
+	// When the resource trackers are zero we should not expect anything to come in later.
+	if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) {
+		if err := sa.HandleApplicationEvent(waitApplication); err != nil {
+			log.Logger().Warn("Application state not changed to Waiting while updating ask(s)",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+
+	return toRelease
+}
+
+// Add an allocation ask to this application
+// If the ask already exist update the existing info
+func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return fmt.Errorf("ask cannot be nil when added to app %s", sa.ApplicationID)
+	}
+	if ask.GetPendingAskRepeat() == 0 || resources.IsZero(ask.AllocatedResource) {
+		return fmt.Errorf("invalid ask added to app %s: %v", sa.ApplicationID, ask)
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	delta := resources.Multiply(ask.AllocatedResource, int64(ask.GetPendingAskRepeat()))
+
+	var oldAskResource *resources.Resource = nil
+	if oldAsk := sa.requests[ask.AllocationKey]; oldAsk != nil {
+		oldAskResource = resources.Multiply(oldAsk.AllocatedResource, int64(oldAsk.GetPendingAskRepeat()))
+	}
+
+	// Check if we need to change state based on the ask added, there are two cases:
+	// 1) first ask added on a new app: state is New
+	// 2) all asks and allocation have been removed: state is Waiting
+	// Move the state and get it scheduling (again)
+	if sa.stateMachine.Is(New.String()) || sa.stateMachine.Is(Waiting.String()) {
+		if err := sa.HandleApplicationEvent(runApplication); err != nil {
+			log.Logger().Debug("Application state change failed while adding new ask",
+				zap.String("currentState", sa.CurrentState()),
+				zap.Error(err))
+		}
+	}
+	sa.requests[ask.AllocationKey] = ask
+
+	// Update total pending resource
+	delta.SubFrom(oldAskResource)
+	sa.pending.AddTo(delta)
+	sa.queue.incPendingResource(delta)
+
+	return nil
+}
+
+// Add the ask when a node allocation is recovered. Maintaining the rule that an Allocation always has a
+// link to an AllocationAsk.
+// Safeguarded against a nil but the recovery generates the ask and should never be nil.
+func (sa *Application) RecoverAllocationAsk(ask *AllocationAsk) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask == nil {
+		return
+	}
+	ask.setQueue(sa.queue.QueuePath)
+	sa.requests[ask.AllocationKey] = ask
+}
+
+func (sa *Application) updateAskRepeat(allocKey string, delta int32) (*resources.Resource, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	if ask := sa.requests[allocKey]; ask != nil {
+		return sa.updateAskRepeatInternal(ask, delta)
+	}
+	return nil, fmt.Errorf("failed to locate ask with key %s", allocKey)
+}
+
+func (sa *Application) updateAskRepeatInternal(ask *AllocationAsk, delta int32) (*resources.Resource, error) {
+	// updating with delta does error checking internally
+	if !ask.UpdatePendingAskRepeat(delta) {
+		return nil, fmt.Errorf("ask repaeat not updated resulting repeat less than zero for ask %s on app %s", ask.AllocationKey, sa.ApplicationID)
+	}
+
+	deltaPendingResource := resources.Multiply(ask.AllocatedResource, int64(delta))
+	sa.pending.AddTo(deltaPendingResource)
+	// update the pending of the queue with the same delta
+	sa.queue.incPendingResource(deltaPendingResource)
+
+	return deltaPendingResource, nil
+}
+
+// Return if the application has any reservations.
+func (sa *Application) hasReserved() bool {
+	sa.RLock()
+	defer sa.RUnlock()
+	return len(sa.reservations) > 0
+}
+
+// Return if the application has the node reserved.
+// An empty nodeID is never reserved.
+func (sa *Application) IsReservedOnNode(nodeID string) bool {
+	if nodeID == "" {
+		return false
+	}
+	sa.RLock()
+	defer sa.RUnlock()
+	for key := range sa.reservations {
+		if strings.HasPrefix(key, nodeID) {
+			return true
+		}
+	}
+	return false
+}
+
+// Reserve the application for this node and ask combination.
+// If the reservation fails the function returns false, if the reservation is made it returns true.
+// If the node and ask combination was already reserved for the application this is a noop and returns true.
+func (sa *Application) Reserve(node *Node, ask *AllocationAsk) error {
+	sa.Lock()
+	defer sa.Unlock()
+	// create the reservation (includes nil checks)
+	nodeReservation := newReservation(node, sa, ask, true)
+	if nodeReservation == nil {
+		log.Logger().Debug("reservation creation failed unexpectedly",
+			zap.String("app", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return fmt.Errorf("reservation creation failed node or ask are nil on appID %s", sa.ApplicationID)
+	}
+	allocKey := ask.AllocationKey
+	if sa.requests[allocKey] == nil {
+		log.Logger().Debug("ask is not registered to this app",
+			zap.String("app", sa.ApplicationID),
+			zap.String("allocKey", allocKey))
+		return fmt.Errorf("reservation creation failed ask %s not found on appID %s", allocKey, sa.ApplicationID)
+	}
+	if !sa.canAskReserve(ask) {
+		return fmt.Errorf("reservation of ask exceeds pending repeat, pending ask repeat %d", ask.GetPendingAskRepeat())
+	}
+	// check if we can reserve the node before reserving on the app
+	if err := node.Reserve(sa, ask); err != nil {
+		return err
+	}
+	sa.reservations[nodeReservation.getKey()] = nodeReservation
+	log.Logger().Info("reservation added successfully", zap.String("node", node.NodeID),
+		zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+	return nil
+}
+
+// UnReserve the application for this node and ask combination.
+// This first removes the reservation from the node.
+// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
+// The error is set if the reservation key cannot be removed from the app or node.
+func (sa *Application) UnReserve(node *Node, ask *AllocationAsk) (int, error) {
+	sa.Lock()
+	defer sa.Unlock()
+	return sa.unReserveInternal(node, ask)
+}
+
+// Unlocked version for UnReserve that really does the work.
+// Must only be called while holding the application lock.
+func (sa *Application) unReserveInternal(node *Node, ask *AllocationAsk) (int, error) {
+	resKey := reservationKey(node, nil, ask)
+	if resKey == "" {
+		log.Logger().Debug("unreserve reservation key create failed unexpectedly",
+			zap.String("appID", sa.ApplicationID),
+			zap.Any("node", node),
+			zap.Any("ask", ask))
+		return 0, fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationID)
+	}
+	// unReserve the node before removing from the app
+	var num int
+	var err error
+	if num, err = node.unReserve(sa, ask); err != nil {
+		return 0, err
+	}
+	// if the unreserve worked on the node check the app
+	if _, found := sa.reservations[resKey]; found {
+		// worked on the node means either found or not but no error, log difference here
+		if num == 0 {
+			log.Logger().Info("reservation not found while removing from node, app has reservation",
+				zap.String("appID", sa.ApplicationID),
+				zap.String("nodeID", node.NodeID),
+				zap.String("ask", ask.AllocationKey))
+		}
+		delete(sa.reservations, resKey)
+		log.Logger().Info("reservation removed successfully", zap.String("node", node.NodeID),
+			zap.String("app", ask.ApplicationID), zap.String("ask", ask.AllocationKey))
+		return 1, nil
+	}
+	// reservation was not found
+	log.Logger().Info("reservation not found while removing from app",
+		zap.String("appID", sa.ApplicationID),
+		zap.String("nodeID", node.NodeID),
+		zap.String("ask", ask.AllocationKey),
+		zap.Int("nodeReservationsRemoved", num))
+	return 0, nil
+}
+
+// Return the allocation reservations on any node.
+// The returned array is 0 or more keys into the reservations map.
+// No locking must be called while holding the lock
+func (sa *Application) IsAskReserved(allocKey string) []string {
+	reservationKeys := make([]string, 0)
+	if allocKey == "" {
+		return reservationKeys
+	}
+	for key := range sa.reservations {
+		if strings.HasSuffix(key, allocKey) {
+			reservationKeys = append(reservationKeys, key)
+		}
+	}
+	return reservationKeys
+}
+
+// Check if the allocation has already been reserved. An ask can reserve multiple nodes if the request has a repeat set
+// larger than 1. It can never reserve more than the repeat number of nodes.
+// No locking must be called while holding the lock
+func (sa *Application) canAskReserve(ask *AllocationAsk) bool {
+	allocKey := ask.AllocationKey
+	pending := int(ask.GetPendingAskRepeat())
+	resNumber := sa.IsAskReserved(allocKey)
+	if len(resNumber) >= pending {
+		log.Logger().Debug("reservation exceeds repeats",
+			zap.String("askKey", allocKey),
+			zap.Int("askPending", pending),
+			zap.Int("askReserved", len(resNumber)))
+	}
+	return pending > len(resNumber)
+}
+
+// Sort the request for the app in order based on the priority of the request.
+// The sorted list only contains candidates that have an outstanding repeat.
+// No locking must be called while holding the lock
+func (sa *Application) sortRequests(ascending bool) {
+	sa.sortedRequests = nil
+	for _, request := range sa.requests {
+		if request.GetPendingAskRepeat() == 0 {
+			continue
+		}
+		sa.sortedRequests = append(sa.sortedRequests, request)
+	}
+	// we might not have any requests
+	if len(sa.sortedRequests) > 0 {
+		sortAskByPriority(sa.sortedRequests, ascending)
+	}
+}
+
+func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, total *[]*AllocationAsk) {
+	sa.RLock()
+	defer sa.RUnlock()
+
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	for _, request := range sa.sortedRequests {
+		if headRoom == nil || resources.FitIn(headRoom, request.AllocatedResource) {
+			// if headroom is still enough for the resources
+			*total = append(*total, request)
+			if headRoom != nil {
+				headRoom.SubFrom(request.AllocatedResource)
+			}
+		}
+	}
+}
+
+// Try a regular allocation of the pending requests
+func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// make sure the request are sorted
+	sa.sortRequests(false)
+	// get all the requests from the app sorted in order
+	for _, request := range sa.sortedRequests {
+		// resource must fit in headroom otherwise skip the request
+		if !resources.FitIn(headRoom, request.AllocatedResource) {
+			// post scheduling events via the event plugin
+			if eventCache := events.GetEventCache(); eventCache != nil {
+				message := fmt.Sprintf("Application %s does not fit into %s queue", request.ApplicationID, sa.QueueName)
+				if event, err := events.CreateRequestEventRecord(request.AllocationKey, request.ApplicationID, "InsufficientQueueResources", message); err != nil {
+					log.Logger().Warn("Event creation failed",
+						zap.String("event message", message),
+						zap.Error(err))
+				} else {
+					eventCache.AddEvent(event)
+				}
+			}
+			continue
+		}
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodes(request, iterator)
+			// have a candidate return it
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	// no requests fit, skip to next app
+	return nil
+}
+
+// Try a reserved allocation of an outstanding reservation
+func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIterator func() interfaces.NodeIterator) *Allocation {
+	sa.Lock()
+	defer sa.Unlock()
+	// process all outstanding reservations and pick the first one that fits
+	for _, reserve := range sa.reservations {
+		ask := sa.requests[reserve.askKey]
+		// sanity check and cleanup if needed
+		if ask == nil || ask.GetPendingAskRepeat() == 0 {
+			var unreserveAsk *AllocationAsk
+			// if the ask was not found we need to construct one to unreserve
+			if ask == nil {
+				unreserveAsk = &AllocationAsk{
+					AllocationKey: reserve.askKey,
+					ApplicationID: sa.ApplicationID,
+				}
+			} else {
+				unreserveAsk = ask
+			}
+			// remove the reservation as this should not be reserved
+			alloc := newReservedAllocation(Unreserved, reserve.nodeID, unreserveAsk)
+			return alloc
+		}
+		// check if this fits in the queue's head room
+		if !resources.FitIn(headRoom, ask.AllocatedResource) {
+			continue
+		}
+		// check allocation possibility
+		alloc := sa.tryNode(reserve.node, ask)
+		// allocation worked fix the result and return
+		if alloc != nil {
+			alloc.Result = AllocatedReserved
+			return alloc
+		}
+	}
+	// lets try this on all other nodes
+	for _, reserve := range sa.reservations {
+		iterator := nodeIterator()
+		if iterator != nil {
+			alloc := sa.tryNodesNoReserve(reserve.ask, iterator, reserve.nodeID)
+			// have a candidate return it, including the node that was reserved
+			if alloc != nil {
+				return alloc
+			}
+		}
+	}
+	return nil
+}
+
+// Try all the nodes for a reserved request that have not been tried yet.
+// This should never result in a reservation as the ask is already reserved
+func (sa *Application) tryNodesNoReserve(ask *AllocationAsk, iterator interfaces.NodeIterator, reservedNode string) *Allocation {
+	for iterator.HasNext() {
+		node, ok := iterator.Next().(*Node)
+		if !ok {
+			log.Logger().Debug("Node iterator failed to return a node")
+			return nil
+		}
+		// skip over the node if the resource does not fit the node or this is the reserved node.
+		if !node.FitInNode(ask.AllocatedResource) || node.NodeID == reservedNode {
+			continue
+		}
+		alloc := sa.tryNode(node, ask)
+		// allocation worked: update result and return
+		if alloc != nil {
+			alloc.ReservedNodeID = reservedNode
+			alloc.Result = AllocatedReserved
+			return alloc
+		}
+	}
+	// ask does not fit, skip to next ask
+	return nil
+}
+
+// Try all the nodes for a request. The result is an allocation or reservation of a node.
+// New allocations can only be reserved after a delay.
+func (sa *Application) tryNodes(ask *AllocationAsk, iterator interfaces.NodeIterator) *Allocation {
+	var nodeToReserve *Node
+	scoreReserved := math.Inf(1)
+	// check if the ask is reserved or not
+	allocKey := ask.AllocationKey
+	reservedAsks := sa.IsAskReserved(allocKey)
+	allowReserve := len(reservedAsks) < int(ask.pendingRepeatAsk)
+	for iterator.HasNext() {
+		node, ok := iterator.Next().(*Node)
+		if !ok {
+			log.Logger().Debug("Node iterator failed to return a node")
+			return nil
+		}
+		// skip over the node if the resource does not fit the node at all.
+		if !node.FitInNode(ask.AllocatedResource) {
+			continue
+		}
+		alloc := sa.tryNode(node, ask)
+		// allocation worked so return
+		if alloc != nil {
+			// check if the node was reserved for this ask: if it is set the result and return
+			// NOTE: this is a safeguard as reserved nodes should never be part of the iterator
+			// but we have no locking
+			if _, ok = sa.reservations[reservationKey(node, nil, ask)]; ok {
+				log.Logger().Debug("allocate found reserved ask during non reserved allocate",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("nodeID", node.NodeID),
+					zap.String("allocationKey", allocKey))
+				alloc.Result = AllocatedReserved
+				return alloc
+			}
+			// we could also have a different node reserved for this ask if it has pick one of
+			// the reserved nodes to unreserve (first one in the list)
+			if len(reservedAsks) > 0 {
+				nodeID := strings.TrimSuffix(reservedAsks[0], "|"+allocKey)
+				log.Logger().Debug("allocate picking reserved ask during non reserved allocate",
+					zap.String("appID", sa.ApplicationID),
+					zap.String("nodeID", nodeID),
+					zap.String("allocationKey", allocKey))
+				alloc.Result = AllocatedReserved
+				alloc.ReservedNodeID = nodeID
+				return alloc
+			}
+			// nothing reserved just return this as a normal alloc
+			return alloc
+		}
+		// nothing allocated should we look at a reservation?
+		// TODO make this smarter a hardcoded delay is not the right thing
+		askAge := time.Since(ask.GetCreateTime())
+		if allowReserve && askAge > reservationDelay {
+			log.Logger().Debug("app reservation check",
+				zap.String("allocationKey", allocKey),
+				zap.Time("createTime", ask.GetCreateTime()),
+				zap.Duration("askAge", askAge),
+				zap.Duration("reservationDelay", reservationDelay))
+			score := ask.AllocatedResource.FitInScore(node.GetAvailableResource())
+			// Record the so-far best node to reserve
+			if score < scoreReserved {
+				scoreReserved = score
+				nodeToReserve = node
+			}
+		}
+	}
+	// we have not allocated yet, check if we should reserve
+	// NOTE: the node should not be reserved as the iterator filters them but we do not lock the nodes
+	if nodeToReserve != nil && !nodeToReserve.IsReserved() {
+		log.Logger().Debug("found candidate node for app reservation",
+			zap.String("appID", sa.ApplicationID),
+			zap.String("nodeID", nodeToReserve.NodeID),
+			zap.String("allocationKey", allocKey),
+			zap.Int("reservations", len(reservedAsks)),
+			zap.Int32("pendingRepeats", ask.pendingRepeatAsk))
+		// skip the node if conditions can not be satisfied
+		if !nodeToReserve.preReserveConditions(allocKey) {
+			return nil
+		}
+		// return reservation allocation and mark it as a reservation
+		alloc := newReservedAllocation(Reserved, nodeToReserve.NodeID, ask)
+		return alloc
+	}
+	// ask does not fit, skip to next ask
+	return nil
+}
+
+// Try allocating on one specific node
+func (sa *Application) tryNode(node *Node, ask *AllocationAsk) *Allocation {
+	allocKey := ask.AllocationKey
+	toAllocate := ask.AllocatedResource
+	// create the key for the reservation
+	if err := node.preAllocateCheck(toAllocate, reservationKey(nil, sa, ask), false); err != nil {
+		// skip schedule onto node
+		return nil
+	}
+	// skip the node if conditions can not be satisfied
+	if !node.preAllocateConditions(allocKey) {
+		return nil
+	}
+	// everything OK really allocate
+	alloc := NewAllocation(common.GetNewUUID(), node.NodeID, ask)
+	if node.AddAllocation(alloc) {
+		// mark this ask as allocated by lowering the repeat
+		_, err := sa.updateAskRepeatInternal(ask, -1)
+		if err != nil {
+			log.Logger().Debug("ask repeat update failed unexpectedly",
+				zap.Error(err))

Review comment:
       have we ever run into this before?
   right now it just logs a DEBUG message (I think this at least needs to be WARN because when this happens, it means we are having something really bad). If it fails to update the repeat, can we just stop and return without making further allocation?

##########
File path: pkg/scheduler/partition.go
##########
@@ -0,0 +1,1101 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package scheduler
+
+import (
+	"fmt"
+	"math"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/looplab/fsm"
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/security"
+	"github.com/apache/incubator-yunikorn-core/pkg/interfaces"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/metrics"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/objects"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/placement"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/policies"
+	"github.com/apache/incubator-yunikorn-core/pkg/webservice/dao"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+type PartitionContext struct {
+	RmID string // the RM the partition belongs to
+	Name string // name of the partition (logging mainly)
+
+	// Private fields need protection
+	root                   *objects.Queue                  // start of the queue hierarchy
+	applications           map[string]*objects.Application // applications assigned to this partition
+	reservedApps           map[string]int                  // applications reserved within this partition, with reservation count
+	nodes                  map[string]*objects.Node        // nodes assigned to this partition
+	allocations            map[string]*objects.Allocation  // allocations
+	placementManager       *placement.AppPlacementManager  // placement manager for this partition
+	partitionManager       *partitionManager               // manager for this partition
+	stateMachine           *fsm.FSM                        // the state of the partition for scheduling
+	stateTime              time.Time                       // last time the state was updated (needed for cleanup)
+	isPreemptable          bool                            // can allocations be preempted
+	rules                  *[]configs.PlacementRule        // placement rules to be loaded by the scheduler
+	userGroupCache         *security.UserGroupCache        // user cache per partition
+	totalPartitionResource *resources.Resource             // Total node resources
+	nodeSortingPolicy      *policies.NodeSortingPolicy     // Global Node Sorting Policies
+
+	sync.RWMutex
+}
+
+func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
+	if conf.Name == "" || rmID == "" {
+		log.Logger().Info("partition cannot be created",
+			zap.String("partition name", conf.Name),
+			zap.String("rmID", rmID),
+			zap.Any("cluster context", cc))
+		return nil, fmt.Errorf("partition cannot be created without name or RM, one is not set")
+	}
+	pc := &PartitionContext{
+		Name:         conf.Name,
+		RmID:         rmID,
+		stateMachine: objects.NewObjectState(),
+		stateTime:    time.Now(),
+		applications: make(map[string]*objects.Application),
+		reservedApps: make(map[string]int),
+		nodes:        make(map[string]*objects.Node),
+		allocations:  make(map[string]*objects.Allocation),
+	}
+	pc.partitionManager = &partitionManager{
+		pc: pc,
+		cc: cc,
+	}
+	if err := pc.initialPartitionFromConfig(conf); err != nil {
+		return nil, err
+	}
+	return pc, nil
+}
+
+// Initialise the partition
+func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error {
+	if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
+		return fmt.Errorf("partition cannot be created without root queue")
+	}
+
+	// Setup the queue structure: root first it should be the only queue at this level
+	// Add the rest of the queue structure recursively
+	queueConf := conf.Queues[0]
+	var err error
+	if pc.root, err = objects.NewConfiguredQueue(queueConf, nil); err != nil {
+		return err
+	}
+	// recursively add the queues to the root
+	if err = pc.addQueue(queueConf.Queues, pc.root); err != nil {
+		return err
+	}
+	log.Logger().Info("root queue added",
+		zap.String("partitionName", pc.Name),
+		zap.String("rmID", pc.RmID))
+
+	// set preemption needed flag
+	pc.isPreemptable = conf.Preemption.Enabled
+
+	pc.rules = &conf.PlacementRules
+	// We need to pass in the unlocked version of the getQueue function.
+	// Placing an application will already have a lock on the partition context.
+	pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.getQueue)
+	// get the user group cache for the partition
+	// TODO get the resolver from the config
+	pc.userGroupCache = security.GetUserGroupCache("")
+
+	// TODO Need some more cleaner interface here.
+	var configuredPolicy policies.SortingPolicy
+	configuredPolicy, err = policies.FromString(conf.NodeSortPolicy.Type)
+	if err != nil {
+		log.Logger().Debug("NodeSorting policy incorrectly set or unknown",
+			zap.Error(err))
+	}
+	switch configuredPolicy {
+	case policies.BinPackingPolicy, policies.FairnessPolicy:
+		log.Logger().Info("NodeSorting policy set from config",
+			zap.String("policyName", configuredPolicy.String()))
+		pc.nodeSortingPolicy = policies.NewNodeSortingPolicy(conf.NodeSortPolicy.Type)
+	case policies.Unknown:
+		log.Logger().Info("NodeSorting policy not set using 'fair' as default")
+		pc.nodeSortingPolicy = policies.NewNodeSortingPolicy("fair")
+	}
+	return nil
+}
+
+func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) error {
+	pc.Lock()
+	defer pc.Unlock()
+	if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
+		return fmt.Errorf("partition cannot be created without root queue")
+	}
+
+	if pc.placementManager.IsInitialised() {
+		log.Logger().Info("Updating placement manager rules on config reload")
+		err := pc.placementManager.UpdateRules(conf.PlacementRules)
+		if err != nil {
+			log.Logger().Info("New placement rules not activated, config reload failed", zap.Error(err))
+			return err
+		}
+		pc.rules = &conf.PlacementRules
+	} else {
+		log.Logger().Info("Creating new placement manager on config reload")
+		pc.rules = &conf.PlacementRules
+		// We need to pass in the unlocked version of the getQueue function.
+		// Placing an application will already have a lock on the partition context.
+		pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.getQueue)
+	}
+	// start at the root: there is only one queue
+	queueConf := conf.Queues[0]
+	root := pc.root
+	// update the root queue
+	if err := root.SetQueueConfig(queueConf); err != nil {
+		return err
+	}
+	root.UpdateSortType()
+	// update the rest of the queues recursively
+	return pc.updateQueues(queueConf.Queues, root)
+}
+
+// Process the config structure and create a queue info tree for this partition
+func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error {
+	// create the queue at this level
+	for _, queueConf := range conf {
+		thisQueue, err := objects.NewConfiguredQueue(queueConf, parent)
+		if err != nil {
+			return err
+		}
+		// recursive create the queues below
+		if len(queueConf.Queues) > 0 {
+			err = pc.addQueue(queueConf.Queues, thisQueue)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+// Update the passed in queues and then do this recursively for the children
+//
+// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
+func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *objects.Queue) error {
+	// get the name of the passed in queue
+	parentPath := parent.QueuePath + configs.DOT
+	// keep track of which children we have updated
+	visited := map[string]bool{}
+	// walk over the queues recursively
+	for _, queueConfig := range config {
+		pathName := parentPath + queueConfig.Name
+		queue := pc.getQueue(pathName)
+		var err error
+		if queue == nil {
+			queue, err = objects.NewConfiguredQueue(queueConfig, parent)
+		} else {
+			err = queue.SetQueueConfig(queueConfig)
+		}
+		if err != nil {
+			return err
+		}
+		// special call to convert to a real policy from the property
+		queue.UpdateSortType()
+		if err = pc.updateQueues(queueConfig.Queues, queue); err != nil {
+			return err
+		}
+		visited[queue.Name] = true
+	}
+	// remove all children that were not visited
+	for childName, childQueue := range parent.GetCopyOfChildren() {
+		if !visited[childName] {
+			childQueue.MarkQueueForRemoval()
+		}
+	}
+	return nil
+}
+
+// Mark the partition  for removal from the system.
+// This can be executed multiple times and is only effective the first time.
+// The current cleanup sequence is "immediate". This is implemented to allow a graceful cleanup.
+func (pc *PartitionContext) markPartitionForRemoval() {
+	if err := pc.handlePartitionEvent(objects.Remove); err != nil {
+		log.Logger().Error("failed to mark partition for deletion",
+			zap.String("partitionName", pc.Name),
+			zap.Error(err))
+	}
+}
+
+// Get the state of the partition.
+// No new nodes and applications will be accepted if stopped or being removed.
+func (pc *PartitionContext) isDraining() bool {
+	return pc.stateMachine.Current() == objects.Draining.String()
+}
+
+func (pc *PartitionContext) isRunning() bool {
+	return pc.stateMachine.Current() == objects.Active.String()
+}
+
+func (pc *PartitionContext) isStopped() bool {
+	return pc.stateMachine.Current() == objects.Stopped.String()
+}
+
+// Handle the state event for the partition.
+// The state machine handles the locking.
+func (pc *PartitionContext) handlePartitionEvent(event objects.ObjectEvent) error {
+	err := pc.stateMachine.Event(event.String(), pc.Name)
+	if err == nil {
+		pc.stateTime = time.Now()
+		return nil
+	}
+	// handle the same state transition not nil error (limit of fsm).
+	if err.Error() == "no transition" {
+		return nil
+	}
+	return err
+}
+
+// Add a new application to the partition.
+func (pc *PartitionContext) AddApplication(app *objects.Application) error {
+	pc.Lock()
+	defer pc.Unlock()
+
+	if pc.isDraining() || pc.isStopped() {
+		return fmt.Errorf("partition %s is stopped cannot add a new application %s", pc.Name, app.ApplicationID)
+	}
+
+	// Add to applications
+	appID := app.ApplicationID
+	if pc.applications[appID] != nil {
+		return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name)
+	}
+
+	// Put app under the queue
+	queueName := app.QueueName
+	if pc.placementManager.IsInitialised() {
+		err := pc.placementManager.PlaceApplication(app)
+		if err != nil {
+			return fmt.Errorf("failed to place application %s: %v", appID, err)
+		}
+		queueName = app.QueueName
+		if queueName == "" {
+			return fmt.Errorf("application rejected by placement rules: %s", appID)
+		}
+	}
+	// we have a queue name either from placement or direct, get the queue
+	queue := pc.getQueue(queueName)
+	if queue == nil {
+		// queue must exist if not using placement rules
+		if !pc.placementManager.IsInitialised() {
+			return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName)
+		}
+		// with placement rules the hierarchy might not exist so try and create it
+		var err error
+		queue, err = pc.createQueue(queueName, app.GetUser())
+		if err != nil {
+			return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID)
+		}
+	}
+	// check the queue: is a leaf queue with submit access
+	if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) {
+		return fmt.Errorf("failed to find queue %s for application %s", queueName, appID)
+	}
+
+	// all is OK update the app and partition
+	app.SetQueue(queue)
+	queue.AddApplication(app)
+	pc.applications[appID] = app
+
+	return nil
+}
+
+// Remove the application from the partition.
+// This does not fail and handles missing /app/queue/node/allocations internally
+func (pc *PartitionContext) removeApplication(appID string) []*objects.Allocation {
+	pc.Lock()
+	defer pc.Unlock()
+
+	// Remove from applications map
+	if pc.applications[appID] == nil {
+		return nil
+	}
+	app := pc.applications[appID]
+	// remove from partition then cleanup underlying objects
+	delete(pc.applications, appID)
+	delete(pc.reservedApps, appID)
+
+	queueName := app.QueueName
+	// Remove all asks and thus all reservations and pending resources (queue included)
+	_ = app.RemoveAllocationAsk("")
+	// Remove app from queue
+	if queue := pc.getQueue(queueName); queue != nil {
+		queue.RemoveApplication(app)
+	}
+	// Remove all allocations
+	allocations := app.RemoveAllAllocations()
+	// Remove all allocations from nodes and the partition (queues have been updated already)
+	if len(allocations) != 0 {
+		for _, alloc := range allocations {
+			currentUUID := alloc.UUID
+			// Remove from partition
+			if globalAlloc := pc.allocations[currentUUID]; globalAlloc == nil {
+				log.Logger().Warn("unknown allocation: not found on the partition",
+					zap.String("appID", appID),
+					zap.String("allocationId", currentUUID))
+			} else {
+				delete(pc.allocations, currentUUID)
+			}
+
+			// Remove from node: even if not found on the partition to keep things clean
+			node := pc.nodes[alloc.NodeID]
+			if node == nil {
+				log.Logger().Warn("unknown node: not found in active node list",
+					zap.String("appID", appID),
+					zap.String("nodeID", alloc.NodeID))
+				continue
+			}
+			if nodeAlloc := node.RemoveAllocation(currentUUID); nodeAlloc == nil {
+				log.Logger().Warn("unknown allocation: not found on the node",
+					zap.String("appID", appID),
+					zap.String("allocationId", currentUUID),
+					zap.String("nodeID", alloc.NodeID))
+			}
+		}
+	}
+
+	log.Logger().Debug("application removed from the scheduler",
+		zap.String("queue", queueName),
+		zap.String("applicationID", appID))
+
+	return allocations
+}
+
+func (pc *PartitionContext) getApplication(appID string) *objects.Application {
+	pc.RLock()
+	defer pc.RUnlock()
+
+	return pc.applications[appID]
+}
+
+// Return a copy of the map of all reservations for the partition.
+// This will return an empty map if there are no reservations.
+// Visible for tests
+func (pc *PartitionContext) getReservations() map[string]int {
+	pc.RLock()
+	defer pc.RUnlock()
+	reserve := make(map[string]int)
+	for key, num := range pc.reservedApps {
+		reserve[key] = num
+	}
+	return reserve
+}
+
+// Get the queue from the structure based on the fully qualified name.
+// Wrapper around the unlocked version getQueue()
+// Visible by tests
+func (pc *PartitionContext) GetQueue(name string) *objects.Queue {
+	pc.RLock()
+	defer pc.RUnlock()
+	return pc.getQueue(name)
+}
+
+// Get the queue from the structure based on the fully qualified name.
+// The name is not syntax checked and must be valid.
+// Returns nil if the queue is not found otherwise the queue object.
+//
+// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
+func (pc *PartitionContext) getQueue(name string) *objects.Queue {
+	// start at the root
+	queue := pc.root
+	part := strings.Split(strings.ToLower(name), configs.DOT)
+	// no input
+	if len(part) == 0 || part[0] != configs.RootQueue {
+		return nil
+	}
+	// walk over the parts going down towards the requested queue
+	for i := 1; i < len(part); i++ {
+		// if child not found break out and return
+		if queue = queue.GetChildQueue(part[i]); queue == nil {
+			break
+		}
+	}
+	return queue
+}
+
+// Get the queue info for the whole queue structure to pass to the webservice
+func (pc *PartitionContext) GetQueueInfos() dao.QueueDAOInfo {
+	return pc.root.GetQueueInfos()
+}
+
+// Create a queue with full hierarchy. This is called when a new queue is created from a placement rule.
+// The final leaf queue does not exist otherwise we would not get here.
+// This means that at least 1 queue (a leaf queue) will be created
+// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
+func (pc *PartitionContext) createQueue(name string, user security.UserGroup) (*objects.Queue, error) {
+	// find the queue furthest down the hierarchy that exists
+	var toCreate []string
+	if !strings.HasPrefix(name, configs.RootQueue) || !strings.Contains(name, configs.DOT) {
+		return nil, fmt.Errorf("illegal queue name passed in: %s", name)
+	}
+	current := name
+	queue := pc.getQueue(current)
+	log.Logger().Debug("Checking queue creation")
+	for queue == nil {
+		toCreate = append(toCreate, current[strings.LastIndex(current, configs.DOT)+1:])
+		current = current[0:strings.LastIndex(current, configs.DOT)]
+		queue = pc.getQueue(current)
+	}
+	// Check the ACL before we really create
+	// The existing parent queue is the lowest we need to look at
+	if !queue.CheckSubmitAccess(user) {
+		return nil, fmt.Errorf("submit access to queue %s denied during create of: %s", current, name)
+	}
+	if queue.IsLeafQueue() {
+		return nil, fmt.Errorf("creation of queue %s failed parent is already a leaf: %s", name, current)
+	}
+	log.Logger().Debug("Creating queue(s)",
+		zap.String("parent", current),
+		zap.String("fullPath", name))
+	for i := len(toCreate) - 1; i >= 0; i-- {
+		// everything is checked and there should be no errors
+		var err error
+		queue, err = objects.NewDynamicQueue(toCreate[i], i == 0, queue)
+		if err != nil {
+			log.Logger().Warn("Queue auto create failed unexpected",
+				zap.String("queueName", toCreate[i]),
+				zap.Error(err))
+			return nil, err
+		}
+	}
+	return queue, nil
+}
+
+// Get a node from the partition by nodeID.
+func (pc *PartitionContext) GetNode(nodeID string) *objects.Node {
+	pc.RLock()
+	defer pc.RUnlock()
+
+	return pc.nodes[nodeID]
+}
+
+// Get a copy of the  nodes from the partition.
+// This list does not include reserved nodes or nodes marked unschedulable
+func (pc *PartitionContext) getSchedulableNodes() []*objects.Node {
+	return pc.getNodes(true)
+}
+
+// Get a copy of the nodes from the partition.
+// Excludes unschedulable nodes only, reserved node inclusion depends on the parameter passed in.
+func (pc *PartitionContext) getNodes(excludeReserved bool) []*objects.Node {
+	pc.RLock()
+	defer pc.RUnlock()
+
+	nodes := make([]*objects.Node, 0)
+	for _, node := range pc.nodes {
+		// filter out the nodes that are not scheduling
+		if !node.IsSchedulable() || (excludeReserved && node.IsReserved()) {
+			continue
+		}
+		nodes = append(nodes, node)
+	}
+	return nodes
+}
+
+// Add the node to the partition and process the allocations that are reported by the node.
+func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error {
+	if node == nil {
+		return fmt.Errorf("cannot add 'nil' node to partition %s", pc.Name)
+	}
+	pc.Lock()
+	defer pc.Unlock()
+
+	if pc.isDraining() || pc.isStopped() {
+		return fmt.Errorf("partition %s is stopped cannot add a new node %s", pc.Name, node.NodeID)
+	}
+
+	if pc.nodes[node.NodeID] != nil {
+		return fmt.Errorf("partition %s has an existing node %s, node name must be unique", pc.Name, node.NodeID)
+	}
+
+	log.Logger().Debug("adding node to partition",
+		zap.String("nodeID", node.NodeID),
+		zap.String("partition", pc.Name))
+
+	// update the resources available in the cluster
+	if pc.totalPartitionResource == nil {
+		pc.totalPartitionResource = node.GetCapacity().Clone()
+	} else {
+		pc.totalPartitionResource.AddTo(node.GetCapacity())
+	}
+	pc.root.SetMaxResource(pc.totalPartitionResource)
+
+	// Node is added to the system to allow processing of the allocations
+	pc.nodes[node.NodeID] = node
+	// Add allocations that exist on the node when added
+	if len(existingAllocations) > 0 {
+		log.Logger().Info("add existing allocations",
+			zap.String("nodeID", node.NodeID),
+			zap.Int("existingAllocations", len(existingAllocations)))
+		for current, alloc := range existingAllocations {
+			if err := pc.addAllocation(alloc); err != nil {
+				released := pc.removeNodeInternal(node.NodeID)
+				log.Logger().Info("failed to add existing allocations",
+					zap.String("nodeID", node.NodeID),
+					zap.Int("existingAllocations", len(existingAllocations)),
+					zap.Int("releasedAllocations", len(released)),
+					zap.Int("processingAlloc", current))
+				metrics.GetSchedulerMetrics().IncFailedNodes()
+				return err
+			}
+		}
+	}
+
+	// Node is added update the metrics
+	metrics.GetSchedulerMetrics().IncActiveNodes()
+	log.Logger().Info("added node to partition",
+		zap.String("nodeID", node.NodeID),
+		zap.String("partition", pc.Name))
+
+	return nil
+}
+
+// Remove a node from the partition. It returns all removed allocations.
+func (pc *PartitionContext) removeNode(nodeID string) []*objects.Allocation {
+	pc.Lock()
+	defer pc.Unlock()
+	return pc.removeNodeInternal(nodeID)
+}
+
+// Remove a node from the partition. It returns all removed allocations.
+// Unlocked version must be called holding the partition lock.
+func (pc *PartitionContext) removeNodeInternal(nodeID string) []*objects.Allocation {
+	log.Logger().Info("remove node from partition",
+		zap.String("nodeID", nodeID),
+		zap.String("partition", pc.Name))
+
+	node := pc.nodes[nodeID]
+	if node == nil {
+		log.Logger().Debug("node was not found",
+			zap.String("nodeID", nodeID),
+			zap.String("partitionName", pc.Name))
+		return nil
+	}
+
+	// Remove node from list of tracked nodes
+	delete(pc.nodes, nodeID)
+	metrics.GetSchedulerMetrics().DecActiveNodes()
+
+	// found the node cleanup the node and all linked data
+	released := pc.removeNodeAllocations(node)
+	pc.totalPartitionResource.SubFrom(node.GetCapacity())
+	pc.root.SetMaxResource(pc.totalPartitionResource)
+
+	// unreserve all the apps that were reserved on the node
+	reservedKeys, releasedAsks := node.UnReserveApps()
+	// update the partition reservations based on the node clean up
+	for i, appID := range reservedKeys {
+		pc.unReserveCount(appID, releasedAsks[i])
+	}
+
+	log.Logger().Info("node removed",
+		zap.String("partitionName", pc.Name),
+		zap.String("nodeID", node.NodeID))
+	return released
+}
+
+// Remove all allocations that are assigned to a node as part of the node removal. This is not part of the node object
+// as updating the applications and queues is the only goal. Applications and queues are not accessible from the node.
+// The removed allocations are returned.
+func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) []*objects.Allocation {
+	released := make([]*objects.Allocation, 0)
+	// walk over all allocations still registered for this node
+	for _, alloc := range node.GetAllAllocations() {
+		allocID := alloc.UUID
+		// since we are not locking the node and or application we could have had an update while processing
+		// note that we do not return the allocation if the app or allocation is not found and assume that it
+		// was already removed
+		app := pc.applications[alloc.ApplicationID]
+		if app == nil {
+			log.Logger().Info("app is not found, skipping while removing the node",
+				zap.String("appID", alloc.ApplicationID),
+				zap.String("nodeID", node.NodeID))
+			continue
+		}
+		// check allocations on the app
+		if app.RemoveAllocation(allocID) == nil {
+			log.Logger().Info("allocation is not found, skipping while removing the node",
+				zap.String("allocationId", allocID),
+				zap.String("appID", app.ApplicationID),
+				zap.String("nodeID", node.NodeID))
+			continue
+		}
+		if err := app.GetQueue().DecAllocatedResource(alloc.AllocatedResource); err != nil {
+			log.Logger().Warn("failed to release resources from queue",
+				zap.String("appID", alloc.ApplicationID),
+				zap.Error(err))
+		}
+
+		// the allocation is removed so add it to the list that we return
+		released = append(released, alloc)
+		log.Logger().Info("allocation removed",
+			zap.String("allocationId", allocID),
+			zap.String("nodeID", node.NodeID))
+	}
+	return released
+}
+
+func (pc *PartitionContext) calculateOutstandingRequests() []*objects.AllocationAsk {
+	if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
+		return nil
+	}
+	outstanding := make([]*objects.AllocationAsk, 0)
+	pc.root.GetQueueOutstandingRequests(&outstanding)
+	return outstanding
+}
+
+// Try regular allocation for the partition
+// Lock free call this all locks are taken when needed in called functions
+func (pc *PartitionContext) tryAllocate() *objects.Allocation {
+	if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
+		// nothing to do just return
+		return nil
+	}
+	// try allocating from the root down
+	alloc := pc.root.TryAllocate(pc.GetNodeIterator)
+	if alloc != nil {
+		return pc.allocate(alloc)
+	}
+	return nil
+}
+
+// Try process reservations for the partition
+// Lock free call this all locks are taken when needed in called functions
+func (pc *PartitionContext) tryReservedAllocate() *objects.Allocation {
+	if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
+		// nothing to do just return
+		return nil
+	}
+	// try allocating from the root down
+	alloc := pc.root.TryReservedAllocate(pc.GetNodeIterator)
+	if alloc != nil {
+		return pc.allocate(alloc)
+	}
+	return nil
+}
+
+// Process the allocation and make the left over changes in the partition.
+func (pc *PartitionContext) allocate(alloc *objects.Allocation) *objects.Allocation {
+	pc.Lock()
+	defer pc.Unlock()
+	// partition is locked nothing can change from now on
+	// find the app make sure it still exists
+	appID := alloc.ApplicationID
+	app := pc.applications[appID]
+	if app == nil {
+		log.Logger().Info("Application was removed while allocating",
+			zap.String("appID", appID))
+		return nil
+	}
+	// find the node make sure it still exists
+	// if the node was passed in use that ID instead of the one from the allocation
+	// the node ID is set when a reservation is allocated on a non-reserved node
+	var nodeID string
+	if alloc.ReservedNodeID == "" {
+		nodeID = alloc.NodeID
+	} else {
+		nodeID = alloc.ReservedNodeID
+		log.Logger().Debug("Reservation allocated on different node",
+			zap.String("current node", alloc.NodeID),
+			zap.String("reserved node", nodeID),
+			zap.String("appID", appID))
+	}
+	node := pc.nodes[nodeID]
+	if node == nil {
+		log.Logger().Info("Node was removed while allocating",
+			zap.String("nodeID", nodeID),
+			zap.String("appID", appID))
+		return nil
+	}
+	// reservation
+	if alloc.Result == objects.Reserved {
+		pc.reserve(app, node, alloc.Ask)
+		return nil
+	}
+	// unreserve
+	if alloc.Result == objects.Unreserved || alloc.Result == objects.AllocatedReserved {
+		pc.unReserve(app, node, alloc.Ask)
+		if alloc.Result == objects.Unreserved {
+			return nil
+		}
+		// remove the link to the reserved node
+		alloc.ReservedNodeID = ""
+	}
+
+	// Safeguard against the unlikely case that we have clashes.
+	// A clash points to entropy issues on the node.
+	if _, found := pc.allocations[alloc.UUID]; found {
+		for {
+			allocationUUID := common.GetNewUUID()
+			log.Logger().Warn("UUID clash, random generator might be lacking entropy",
+				zap.String("uuid", alloc.UUID),
+				zap.String("new UUID", allocationUUID))
+			if pc.allocations[allocationUUID] == nil {
+				alloc.UUID = allocationUUID
+				break
+			}
+		}
+	}
+	pc.allocations[alloc.UUID] = alloc
+	log.Logger().Info("scheduler allocation processed",
+		zap.String("appID", alloc.ApplicationID),
+		zap.String("allocationKey", alloc.AllocationKey),
+		zap.String("allocatedResource", alloc.AllocatedResource.String()),
+		zap.String("targetNode", alloc.NodeID))
+	// pass the allocation back to the RM via the cluster context
+	return alloc
+}
+
+// Process the reservation in the scheduler
+// Lock free call this must be called holding the context lock
+func (pc *PartitionContext) reserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) {
+	appID := app.ApplicationID
+	// app has node already reserved cannot reserve again
+	if app.IsReservedOnNode(node.NodeID) {
+		log.Logger().Info("Application is already reserved on node",
+			zap.String("appID", appID),
+			zap.String("nodeID", node.NodeID))
+		return
+	}
+	// all ok, add the reservation to the app, this will also reserve the node
+	if err := app.Reserve(node, ask); err != nil {
+		log.Logger().Debug("Failed to handle reservation, error during update of app",
+			zap.Error(err))
+		return
+	}
+
+	// add the reservation to the queue list
+	app.GetQueue().Reserve(appID)
+	// increase the number of reservations for this app
+	pc.reservedApps[appID]++
+
+	log.Logger().Info("allocation ask is reserved",
+		zap.String("appID", ask.ApplicationID),
+		zap.String("queue", ask.QueueName),
+		zap.String("allocationKey", ask.AllocationKey),
+		zap.String("node", node.NodeID))
+}
+
+// Process the unreservation in the scheduler
+// Lock free call this must be called holding the context lock
+func (pc *PartitionContext) unReserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) {
+	appID := app.ApplicationID
+	if pc.reservedApps[appID] == 0 {
+		log.Logger().Info("Application is not reserved in partition",
+			zap.String("appID", appID))
+		return
+	}
+	// all ok, remove the reservation of the app, this will also unReserve the node
+	var err error
+	var num int
+	if num, err = app.UnReserve(node, ask); err != nil {
+		log.Logger().Info("Failed to unreserve, error during allocate on the app",
+			zap.Error(err))
+		return

Review comment:
       when we fail to unreserve the ask on the node, we simply log an INFO message and return.
   does this means we made an `alloc` for this reservation, we will notify the shim about this allocation; but the reservation was not deleted so it is left over on the node?

##########
File path: pkg/scheduler/partition.go
##########
@@ -0,0 +1,1101 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package scheduler
+
+import (
+	"fmt"
+	"math"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/looplab/fsm"
+	"go.uber.org/zap"
+
+	"github.com/apache/incubator-yunikorn-core/pkg/common"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/configs"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/resources"
+	"github.com/apache/incubator-yunikorn-core/pkg/common/security"
+	"github.com/apache/incubator-yunikorn-core/pkg/interfaces"
+	"github.com/apache/incubator-yunikorn-core/pkg/log"
+	"github.com/apache/incubator-yunikorn-core/pkg/metrics"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/objects"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/placement"
+	"github.com/apache/incubator-yunikorn-core/pkg/scheduler/policies"
+	"github.com/apache/incubator-yunikorn-core/pkg/webservice/dao"
+	"github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+)
+
+type PartitionContext struct {
+	RmID string // the RM the partition belongs to
+	Name string // name of the partition (logging mainly)
+
+	// Private fields need protection
+	root                   *objects.Queue                  // start of the queue hierarchy
+	applications           map[string]*objects.Application // applications assigned to this partition
+	reservedApps           map[string]int                  // applications reserved within this partition, with reservation count
+	nodes                  map[string]*objects.Node        // nodes assigned to this partition
+	allocations            map[string]*objects.Allocation  // allocations
+	placementManager       *placement.AppPlacementManager  // placement manager for this partition
+	partitionManager       *partitionManager               // manager for this partition
+	stateMachine           *fsm.FSM                        // the state of the partition for scheduling
+	stateTime              time.Time                       // last time the state was updated (needed for cleanup)
+	isPreemptable          bool                            // can allocations be preempted
+	rules                  *[]configs.PlacementRule        // placement rules to be loaded by the scheduler
+	userGroupCache         *security.UserGroupCache        // user cache per partition
+	totalPartitionResource *resources.Resource             // Total node resources
+	nodeSortingPolicy      *policies.NodeSortingPolicy     // Global Node Sorting Policies
+
+	sync.RWMutex
+}
+
+func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
+	if conf.Name == "" || rmID == "" {
+		log.Logger().Info("partition cannot be created",
+			zap.String("partition name", conf.Name),
+			zap.String("rmID", rmID),
+			zap.Any("cluster context", cc))
+		return nil, fmt.Errorf("partition cannot be created without name or RM, one is not set")
+	}
+	pc := &PartitionContext{
+		Name:         conf.Name,
+		RmID:         rmID,
+		stateMachine: objects.NewObjectState(),
+		stateTime:    time.Now(),
+		applications: make(map[string]*objects.Application),
+		reservedApps: make(map[string]int),
+		nodes:        make(map[string]*objects.Node),
+		allocations:  make(map[string]*objects.Allocation),
+	}
+	pc.partitionManager = &partitionManager{
+		pc: pc,
+		cc: cc,
+	}
+	if err := pc.initialPartitionFromConfig(conf); err != nil {
+		return nil, err
+	}
+	return pc, nil
+}
+
+// Initialise the partition
+func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error {
+	if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
+		return fmt.Errorf("partition cannot be created without root queue")
+	}
+
+	// Setup the queue structure: root first it should be the only queue at this level
+	// Add the rest of the queue structure recursively
+	queueConf := conf.Queues[0]
+	var err error
+	if pc.root, err = objects.NewConfiguredQueue(queueConf, nil); err != nil {
+		return err
+	}
+	// recursively add the queues to the root
+	if err = pc.addQueue(queueConf.Queues, pc.root); err != nil {
+		return err
+	}
+	log.Logger().Info("root queue added",
+		zap.String("partitionName", pc.Name),
+		zap.String("rmID", pc.RmID))
+
+	// set preemption needed flag
+	pc.isPreemptable = conf.Preemption.Enabled
+
+	pc.rules = &conf.PlacementRules
+	// We need to pass in the unlocked version of the getQueue function.
+	// Placing an application will already have a lock on the partition context.
+	pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.getQueue)
+	// get the user group cache for the partition
+	// TODO get the resolver from the config
+	pc.userGroupCache = security.GetUserGroupCache("")
+
+	// TODO Need some more cleaner interface here.
+	var configuredPolicy policies.SortingPolicy
+	configuredPolicy, err = policies.FromString(conf.NodeSortPolicy.Type)
+	if err != nil {
+		log.Logger().Debug("NodeSorting policy incorrectly set or unknown",
+			zap.Error(err))
+	}
+	switch configuredPolicy {
+	case policies.BinPackingPolicy, policies.FairnessPolicy:
+		log.Logger().Info("NodeSorting policy set from config",
+			zap.String("policyName", configuredPolicy.String()))
+		pc.nodeSortingPolicy = policies.NewNodeSortingPolicy(conf.NodeSortPolicy.Type)
+	case policies.Unknown:
+		log.Logger().Info("NodeSorting policy not set using 'fair' as default")
+		pc.nodeSortingPolicy = policies.NewNodeSortingPolicy("fair")
+	}
+	return nil
+}
+
+func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) error {
+	pc.Lock()
+	defer pc.Unlock()
+	if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
+		return fmt.Errorf("partition cannot be created without root queue")
+	}
+
+	if pc.placementManager.IsInitialised() {
+		log.Logger().Info("Updating placement manager rules on config reload")
+		err := pc.placementManager.UpdateRules(conf.PlacementRules)
+		if err != nil {
+			log.Logger().Info("New placement rules not activated, config reload failed", zap.Error(err))
+			return err
+		}
+		pc.rules = &conf.PlacementRules
+	} else {
+		log.Logger().Info("Creating new placement manager on config reload")
+		pc.rules = &conf.PlacementRules
+		// We need to pass in the unlocked version of the getQueue function.
+		// Placing an application will already have a lock on the partition context.
+		pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.getQueue)
+	}
+	// start at the root: there is only one queue
+	queueConf := conf.Queues[0]
+	root := pc.root
+	// update the root queue
+	if err := root.SetQueueConfig(queueConf); err != nil {
+		return err
+	}
+	root.UpdateSortType()
+	// update the rest of the queues recursively
+	return pc.updateQueues(queueConf.Queues, root)
+}
+
+// Process the config structure and create a queue info tree for this partition
+func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error {
+	// create the queue at this level
+	for _, queueConf := range conf {
+		thisQueue, err := objects.NewConfiguredQueue(queueConf, parent)
+		if err != nil {
+			return err
+		}
+		// recursive create the queues below
+		if len(queueConf.Queues) > 0 {
+			err = pc.addQueue(queueConf.Queues, thisQueue)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+// Update the passed in queues and then do this recursively for the children
+//
+// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
+func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *objects.Queue) error {
+	// get the name of the passed in queue
+	parentPath := parent.QueuePath + configs.DOT
+	// keep track of which children we have updated
+	visited := map[string]bool{}
+	// walk over the queues recursively
+	for _, queueConfig := range config {
+		pathName := parentPath + queueConfig.Name
+		queue := pc.getQueue(pathName)
+		var err error
+		if queue == nil {
+			queue, err = objects.NewConfiguredQueue(queueConfig, parent)
+		} else {
+			err = queue.SetQueueConfig(queueConfig)
+		}
+		if err != nil {
+			return err
+		}
+		// special call to convert to a real policy from the property
+		queue.UpdateSortType()
+		if err = pc.updateQueues(queueConfig.Queues, queue); err != nil {
+			return err
+		}
+		visited[queue.Name] = true
+	}
+	// remove all children that were not visited
+	for childName, childQueue := range parent.GetCopyOfChildren() {
+		if !visited[childName] {
+			childQueue.MarkQueueForRemoval()
+		}
+	}
+	return nil
+}
+
+// Mark the partition  for removal from the system.
+// This can be executed multiple times and is only effective the first time.
+// The current cleanup sequence is "immediate". This is implemented to allow a graceful cleanup.
+func (pc *PartitionContext) markPartitionForRemoval() {
+	if err := pc.handlePartitionEvent(objects.Remove); err != nil {
+		log.Logger().Error("failed to mark partition for deletion",
+			zap.String("partitionName", pc.Name),
+			zap.Error(err))
+	}
+}
+
+// Get the state of the partition.
+// No new nodes and applications will be accepted if stopped or being removed.
+func (pc *PartitionContext) isDraining() bool {
+	return pc.stateMachine.Current() == objects.Draining.String()
+}
+
+func (pc *PartitionContext) isRunning() bool {
+	return pc.stateMachine.Current() == objects.Active.String()
+}
+
+func (pc *PartitionContext) isStopped() bool {
+	return pc.stateMachine.Current() == objects.Stopped.String()
+}
+
+// Handle the state event for the partition.
+// The state machine handles the locking.
+func (pc *PartitionContext) handlePartitionEvent(event objects.ObjectEvent) error {
+	err := pc.stateMachine.Event(event.String(), pc.Name)
+	if err == nil {
+		pc.stateTime = time.Now()
+		return nil
+	}
+	// handle the same state transition not nil error (limit of fsm).
+	if err.Error() == "no transition" {
+		return nil
+	}
+	return err
+}
+
+// Add a new application to the partition.
+func (pc *PartitionContext) AddApplication(app *objects.Application) error {
+	pc.Lock()
+	defer pc.Unlock()
+
+	if pc.isDraining() || pc.isStopped() {
+		return fmt.Errorf("partition %s is stopped cannot add a new application %s", pc.Name, app.ApplicationID)
+	}
+
+	// Add to applications
+	appID := app.ApplicationID
+	if pc.applications[appID] != nil {
+		return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name)
+	}
+
+	// Put app under the queue
+	queueName := app.QueueName
+	if pc.placementManager.IsInitialised() {
+		err := pc.placementManager.PlaceApplication(app)
+		if err != nil {
+			return fmt.Errorf("failed to place application %s: %v", appID, err)
+		}
+		queueName = app.QueueName
+		if queueName == "" {
+			return fmt.Errorf("application rejected by placement rules: %s", appID)
+		}
+	}
+	// we have a queue name either from placement or direct, get the queue
+	queue := pc.getQueue(queueName)
+	if queue == nil {
+		// queue must exist if not using placement rules
+		if !pc.placementManager.IsInitialised() {
+			return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName)
+		}
+		// with placement rules the hierarchy might not exist so try and create it
+		var err error
+		queue, err = pc.createQueue(queueName, app.GetUser())
+		if err != nil {
+			return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID)
+		}
+	}
+	// check the queue: is a leaf queue with submit access
+	if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) {
+		return fmt.Errorf("failed to find queue %s for application %s", queueName, appID)
+	}
+
+	// all is OK update the app and partition
+	app.SetQueue(queue)
+	queue.AddApplication(app)
+	pc.applications[appID] = app
+
+	return nil
+}
+
+// Remove the application from the partition.
+// This does not fail and handles missing /app/queue/node/allocations internally
+func (pc *PartitionContext) removeApplication(appID string) []*objects.Allocation {
+	pc.Lock()
+	defer pc.Unlock()
+
+	// Remove from applications map
+	if pc.applications[appID] == nil {
+		return nil
+	}
+	app := pc.applications[appID]
+	// remove from partition then cleanup underlying objects
+	delete(pc.applications, appID)
+	delete(pc.reservedApps, appID)
+
+	queueName := app.QueueName
+	// Remove all asks and thus all reservations and pending resources (queue included)
+	_ = app.RemoveAllocationAsk("")
+	// Remove app from queue
+	if queue := pc.getQueue(queueName); queue != nil {
+		queue.RemoveApplication(app)
+	}
+	// Remove all allocations
+	allocations := app.RemoveAllAllocations()
+	// Remove all allocations from nodes and the partition (queues have been updated already)
+	if len(allocations) != 0 {
+		for _, alloc := range allocations {
+			currentUUID := alloc.UUID
+			// Remove from partition
+			if globalAlloc := pc.allocations[currentUUID]; globalAlloc == nil {
+				log.Logger().Warn("unknown allocation: not found on the partition",
+					zap.String("appID", appID),
+					zap.String("allocationId", currentUUID))
+			} else {
+				delete(pc.allocations, currentUUID)
+			}
+
+			// Remove from node: even if not found on the partition to keep things clean
+			node := pc.nodes[alloc.NodeID]
+			if node == nil {
+				log.Logger().Warn("unknown node: not found in active node list",
+					zap.String("appID", appID),
+					zap.String("nodeID", alloc.NodeID))
+				continue
+			}
+			if nodeAlloc := node.RemoveAllocation(currentUUID); nodeAlloc == nil {
+				log.Logger().Warn("unknown allocation: not found on the node",
+					zap.String("appID", appID),
+					zap.String("allocationId", currentUUID),
+					zap.String("nodeID", alloc.NodeID))
+			}
+		}
+	}
+
+	log.Logger().Debug("application removed from the scheduler",
+		zap.String("queue", queueName),
+		zap.String("applicationID", appID))
+
+	return allocations
+}
+
+func (pc *PartitionContext) getApplication(appID string) *objects.Application {
+	pc.RLock()
+	defer pc.RUnlock()
+
+	return pc.applications[appID]
+}
+
+// Return a copy of the map of all reservations for the partition.
+// This will return an empty map if there are no reservations.
+// Visible for tests
+func (pc *PartitionContext) getReservations() map[string]int {
+	pc.RLock()
+	defer pc.RUnlock()
+	reserve := make(map[string]int)
+	for key, num := range pc.reservedApps {
+		reserve[key] = num
+	}
+	return reserve
+}
+
+// Get the queue from the structure based on the fully qualified name.
+// Wrapper around the unlocked version getQueue()
+// Visible by tests
+func (pc *PartitionContext) GetQueue(name string) *objects.Queue {
+	pc.RLock()
+	defer pc.RUnlock()
+	return pc.getQueue(name)
+}
+
+// Get the queue from the structure based on the fully qualified name.
+// The name is not syntax checked and must be valid.
+// Returns nil if the queue is not found otherwise the queue object.
+//
+// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
+func (pc *PartitionContext) getQueue(name string) *objects.Queue {
+	// start at the root
+	queue := pc.root
+	part := strings.Split(strings.ToLower(name), configs.DOT)
+	// no input
+	if len(part) == 0 || part[0] != configs.RootQueue {
+		return nil
+	}
+	// walk over the parts going down towards the requested queue
+	for i := 1; i < len(part); i++ {
+		// if child not found break out and return
+		if queue = queue.GetChildQueue(part[i]); queue == nil {
+			break
+		}
+	}
+	return queue
+}
+
+// Get the queue info for the whole queue structure to pass to the webservice
+func (pc *PartitionContext) GetQueueInfos() dao.QueueDAOInfo {
+	return pc.root.GetQueueInfos()
+}
+
+// Create a queue with full hierarchy. This is called when a new queue is created from a placement rule.
+// The final leaf queue does not exist otherwise we would not get here.
+// This means that at least 1 queue (a leaf queue) will be created
+// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
+func (pc *PartitionContext) createQueue(name string, user security.UserGroup) (*objects.Queue, error) {
+	// find the queue furthest down the hierarchy that exists
+	var toCreate []string
+	if !strings.HasPrefix(name, configs.RootQueue) || !strings.Contains(name, configs.DOT) {
+		return nil, fmt.Errorf("illegal queue name passed in: %s", name)
+	}
+	current := name
+	queue := pc.getQueue(current)
+	log.Logger().Debug("Checking queue creation")
+	for queue == nil {
+		toCreate = append(toCreate, current[strings.LastIndex(current, configs.DOT)+1:])
+		current = current[0:strings.LastIndex(current, configs.DOT)]
+		queue = pc.getQueue(current)
+	}
+	// Check the ACL before we really create
+	// The existing parent queue is the lowest we need to look at
+	if !queue.CheckSubmitAccess(user) {
+		return nil, fmt.Errorf("submit access to queue %s denied during create of: %s", current, name)
+	}
+	if queue.IsLeafQueue() {
+		return nil, fmt.Errorf("creation of queue %s failed parent is already a leaf: %s", name, current)
+	}
+	log.Logger().Debug("Creating queue(s)",
+		zap.String("parent", current),
+		zap.String("fullPath", name))
+	for i := len(toCreate) - 1; i >= 0; i-- {
+		// everything is checked and there should be no errors
+		var err error
+		queue, err = objects.NewDynamicQueue(toCreate[i], i == 0, queue)
+		if err != nil {
+			log.Logger().Warn("Queue auto create failed unexpected",
+				zap.String("queueName", toCreate[i]),
+				zap.Error(err))
+			return nil, err
+		}
+	}
+	return queue, nil
+}
+
+// Get a node from the partition by nodeID.
+func (pc *PartitionContext) GetNode(nodeID string) *objects.Node {
+	pc.RLock()
+	defer pc.RUnlock()
+
+	return pc.nodes[nodeID]
+}
+
+// Get a copy of the  nodes from the partition.
+// This list does not include reserved nodes or nodes marked unschedulable
+func (pc *PartitionContext) getSchedulableNodes() []*objects.Node {
+	return pc.getNodes(true)
+}
+
+// Get a copy of the nodes from the partition.
+// Excludes unschedulable nodes only, reserved node inclusion depends on the parameter passed in.
+func (pc *PartitionContext) getNodes(excludeReserved bool) []*objects.Node {
+	pc.RLock()
+	defer pc.RUnlock()
+
+	nodes := make([]*objects.Node, 0)
+	for _, node := range pc.nodes {
+		// filter out the nodes that are not scheduling
+		if !node.IsSchedulable() || (excludeReserved && node.IsReserved()) {
+			continue
+		}
+		nodes = append(nodes, node)
+	}
+	return nodes
+}
+
+// Add the node to the partition and process the allocations that are reported by the node.
+func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error {
+	if node == nil {
+		return fmt.Errorf("cannot add 'nil' node to partition %s", pc.Name)
+	}
+	pc.Lock()
+	defer pc.Unlock()
+
+	if pc.isDraining() || pc.isStopped() {
+		return fmt.Errorf("partition %s is stopped cannot add a new node %s", pc.Name, node.NodeID)
+	}
+
+	if pc.nodes[node.NodeID] != nil {
+		return fmt.Errorf("partition %s has an existing node %s, node name must be unique", pc.Name, node.NodeID)
+	}
+
+	log.Logger().Debug("adding node to partition",
+		zap.String("nodeID", node.NodeID),
+		zap.String("partition", pc.Name))
+
+	// update the resources available in the cluster
+	if pc.totalPartitionResource == nil {
+		pc.totalPartitionResource = node.GetCapacity().Clone()
+	} else {
+		pc.totalPartitionResource.AddTo(node.GetCapacity())
+	}
+	pc.root.SetMaxResource(pc.totalPartitionResource)
+
+	// Node is added to the system to allow processing of the allocations
+	pc.nodes[node.NodeID] = node
+	// Add allocations that exist on the node when added
+	if len(existingAllocations) > 0 {
+		log.Logger().Info("add existing allocations",
+			zap.String("nodeID", node.NodeID),
+			zap.Int("existingAllocations", len(existingAllocations)))
+		for current, alloc := range existingAllocations {
+			if err := pc.addAllocation(alloc); err != nil {
+				released := pc.removeNodeInternal(node.NodeID)
+				log.Logger().Info("failed to add existing allocations",
+					zap.String("nodeID", node.NodeID),
+					zap.Int("existingAllocations", len(existingAllocations)),
+					zap.Int("releasedAllocations", len(released)),
+					zap.Int("processingAlloc", current))
+				metrics.GetSchedulerMetrics().IncFailedNodes()
+				return err
+			}
+		}
+	}
+
+	// Node is added update the metrics
+	metrics.GetSchedulerMetrics().IncActiveNodes()
+	log.Logger().Info("added node to partition",
+		zap.String("nodeID", node.NodeID),
+		zap.String("partition", pc.Name))
+
+	return nil
+}
+
+// Remove a node from the partition. It returns all removed allocations.
+func (pc *PartitionContext) removeNode(nodeID string) []*objects.Allocation {
+	pc.Lock()
+	defer pc.Unlock()
+	return pc.removeNodeInternal(nodeID)
+}
+
+// Remove a node from the partition. It returns all removed allocations.
+// Unlocked version must be called holding the partition lock.
+func (pc *PartitionContext) removeNodeInternal(nodeID string) []*objects.Allocation {
+	log.Logger().Info("remove node from partition",
+		zap.String("nodeID", nodeID),
+		zap.String("partition", pc.Name))
+
+	node := pc.nodes[nodeID]
+	if node == nil {
+		log.Logger().Debug("node was not found",
+			zap.String("nodeID", nodeID),
+			zap.String("partitionName", pc.Name))
+		return nil
+	}
+
+	// Remove node from list of tracked nodes
+	delete(pc.nodes, nodeID)
+	metrics.GetSchedulerMetrics().DecActiveNodes()
+
+	// found the node cleanup the node and all linked data
+	released := pc.removeNodeAllocations(node)
+	pc.totalPartitionResource.SubFrom(node.GetCapacity())
+	pc.root.SetMaxResource(pc.totalPartitionResource)
+
+	// unreserve all the apps that were reserved on the node
+	reservedKeys, releasedAsks := node.UnReserveApps()
+	// update the partition reservations based on the node clean up
+	for i, appID := range reservedKeys {
+		pc.unReserveCount(appID, releasedAsks[i])
+	}
+
+	log.Logger().Info("node removed",
+		zap.String("partitionName", pc.Name),
+		zap.String("nodeID", node.NodeID))
+	return released
+}
+
+// Remove all allocations that are assigned to a node as part of the node removal. This is not part of the node object
+// as updating the applications and queues is the only goal. Applications and queues are not accessible from the node.
+// The removed allocations are returned.
+func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) []*objects.Allocation {
+	released := make([]*objects.Allocation, 0)
+	// walk over all allocations still registered for this node
+	for _, alloc := range node.GetAllAllocations() {
+		allocID := alloc.UUID
+		// since we are not locking the node and or application we could have had an update while processing
+		// note that we do not return the allocation if the app or allocation is not found and assume that it
+		// was already removed
+		app := pc.applications[alloc.ApplicationID]
+		if app == nil {
+			log.Logger().Info("app is not found, skipping while removing the node",
+				zap.String("appID", alloc.ApplicationID),
+				zap.String("nodeID", node.NodeID))
+			continue
+		}
+		// check allocations on the app
+		if app.RemoveAllocation(allocID) == nil {
+			log.Logger().Info("allocation is not found, skipping while removing the node",
+				zap.String("allocationId", allocID),
+				zap.String("appID", app.ApplicationID),
+				zap.String("nodeID", node.NodeID))
+			continue
+		}
+		if err := app.GetQueue().DecAllocatedResource(alloc.AllocatedResource); err != nil {
+			log.Logger().Warn("failed to release resources from queue",
+				zap.String("appID", alloc.ApplicationID),
+				zap.Error(err))
+		}
+
+		// the allocation is removed so add it to the list that we return
+		released = append(released, alloc)
+		log.Logger().Info("allocation removed",
+			zap.String("allocationId", allocID),
+			zap.String("nodeID", node.NodeID))
+	}
+	return released
+}
+
+func (pc *PartitionContext) calculateOutstandingRequests() []*objects.AllocationAsk {
+	if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
+		return nil
+	}
+	outstanding := make([]*objects.AllocationAsk, 0)
+	pc.root.GetQueueOutstandingRequests(&outstanding)
+	return outstanding
+}
+
+// Try regular allocation for the partition
+// Lock free call this all locks are taken when needed in called functions
+func (pc *PartitionContext) tryAllocate() *objects.Allocation {
+	if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
+		// nothing to do just return
+		return nil
+	}
+	// try allocating from the root down
+	alloc := pc.root.TryAllocate(pc.GetNodeIterator)
+	if alloc != nil {
+		return pc.allocate(alloc)
+	}
+	return nil
+}
+
+// Try process reservations for the partition
+// Lock free call this all locks are taken when needed in called functions
+func (pc *PartitionContext) tryReservedAllocate() *objects.Allocation {
+	if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
+		// nothing to do just return
+		return nil
+	}
+	// try allocating from the root down
+	alloc := pc.root.TryReservedAllocate(pc.GetNodeIterator)
+	if alloc != nil {
+		return pc.allocate(alloc)

Review comment:
       I am a bit worried about this.
   What if we get a `alloc` by `pc.root.TryReservedAllocate`, but in this line, we failed to allocate the alloc to the partition? Looks like what `pc.allocate(alloc)` really does is to just update 
   1. Reservation or Unreservation
   2. Add the `alloc` to the partitions' allocations map
   
   respectively
   1. If we made an `alloc` for a reserved container, but for some reason (node/app doesn't exist anymore). This will return a nil. And then we will not notify the RM about the allocation, skip the cache update. However the alloc is already made in app/queue/node. Looks like this will leak the resources. What should we do here?
   2. why do we need this allocations map in the partition? Can we remove it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org