You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2021/03/01 09:58:04 UTC

[GitHub] [incubator-yunikorn-core] wilfred-s commented on a change in pull request #250: [YUNIKORN-519, YUNIKORN-460] Placeholder cleanup

wilfred-s commented on a change in pull request #250:
URL: https://github.com/apache/incubator-yunikorn-core/pull/250#discussion_r584555869



##########
File path: pkg/scheduler/objects/application.go
##########
@@ -225,6 +238,57 @@ func (sa *Application) clearStateTimer() {
 		zap.String("state", sa.stateMachine.Current()))
 }
 
+func (sa *Application) isWaitingStateTimedOut() bool {
+	return sa.IsWaiting() && sa.stateTimer == nil
+}
+func (sa *Application) initPlaceholderTimer() {
+	if sa.placeholderTimer != nil || !sa.IsAccepted() || sa.execTimeout <= 0 {
+		return
+	}
+	log.Logger().Debug("Application placeholder timer initiated",
+		zap.String("AppID", sa.ApplicationID),
+		zap.Duration("Timeout", sa.execTimeout))
+	sa.placeholderTimer = time.AfterFunc(sa.execTimeout, sa.timeoutPlaceholderProcessing)
+}
+
+func (sa *Application) clearPlaceholderTimer() {
+	if sa == nil || sa.placeholderTimer == nil {
+		return
+	}
+	sa.placeholderTimer.Stop()
+	sa.placeholderTimer = nil
+}
+
+func (sa *Application) timeoutPlaceholderProcessing() {
+	sa.Lock()
+	defer sa.Unlock()
+	// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
+	switch {
+	case sa.allPlaceholdersAllocated() && !sa.allPlaceholdersReplaced():
+		{
+			for _, alloc := range sa.GetPlaceholderAllocations() {
+				alloc.Result = PlaceholderExpired

Review comment:
       We can send these directly to the shim using the RM event handler. That saves us going through the extra step in the context.
   We should also mark this placeholder as released. The shim wil take a while to release the pod and only after it is really released by the shim nodes etc can be updated.

##########
File path: pkg/scheduler/objects/application.go
##########
@@ -225,6 +238,57 @@ func (sa *Application) clearStateTimer() {
 		zap.String("state", sa.stateMachine.Current()))
 }
 
+func (sa *Application) isWaitingStateTimedOut() bool {
+	return sa.IsWaiting() && sa.stateTimer == nil
+}
+func (sa *Application) initPlaceholderTimer() {
+	if sa.placeholderTimer != nil || !sa.IsAccepted() || sa.execTimeout <= 0 {
+		return
+	}
+	log.Logger().Debug("Application placeholder timer initiated",
+		zap.String("AppID", sa.ApplicationID),
+		zap.Duration("Timeout", sa.execTimeout))
+	sa.placeholderTimer = time.AfterFunc(sa.execTimeout, sa.timeoutPlaceholderProcessing)
+}
+
+func (sa *Application) clearPlaceholderTimer() {
+	if sa == nil || sa.placeholderTimer == nil {
+		return
+	}
+	sa.placeholderTimer.Stop()
+	sa.placeholderTimer = nil
+}
+
+func (sa *Application) timeoutPlaceholderProcessing() {
+	sa.Lock()
+	defer sa.Unlock()
+	// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
+	switch {
+	case sa.allPlaceholdersAllocated() && !sa.allPlaceholdersReplaced():
+		{
+			for _, alloc := range sa.GetPlaceholderAllocations() {
+				alloc.Result = PlaceholderExpired
+			}
+		}
+	default:
+		{
+			// Case 2: in every other case fail the application, and notify the context about the expired placeholders
+			if err := sa.HandleApplicationEvent(KillApplication); err != nil {
+				log.Logger().Debug("Application state change failed when placeholder timed out",
+					zap.String("AppID", sa.ApplicationID),
+					zap.String("currentState", sa.CurrentState()),
+					zap.Error(err))
+			}
+		}
+	}
+
+	sa.rmEventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMPartitionPlaceholderExpiredEvent{

Review comment:
       We can send the same message directly from here using the RM event handler without going through the context

##########
File path: pkg/scheduler/objects/application.go
##########
@@ -193,20 +197,29 @@ func (sa *Application) setStateTimer(timeout time.Duration, currentState string,
 		zap.String("state", sa.stateMachine.Current()),
 		zap.Duration("timeout", timeout))
 
-	sa.stateTimer = time.AfterFunc(timeout, sa.timeoutTimer(currentState, event))
+	sa.stateTimer = time.AfterFunc(timeout, sa.timeoutStateTimer(currentState, event))
 }
 
-func (sa *Application) timeoutTimer(expectedState string, event applicationEvent) func() {
+func (sa *Application) timeoutStateTimer(expectedState string, event applicationEvent) func() {
 	return func() {
 		// make sure we are still in the right state
 		// we could have been killed or something might have happened while waiting for a lock
 		if expectedState == sa.stateMachine.Current() {
 			log.Logger().Debug("Application state: auto progress",
 				zap.String("applicationID", sa.ApplicationID),
 				zap.String("state", sa.stateMachine.Current()))
-
-			//nolint: errcheck
-			_ = sa.HandleApplicationEvent(event)
+			// if the app is waiting, but there are placeholders left, first do the cleanup
+			if sa.IsWaiting() && !resources.IsZero(sa.GetPlaceholderResource()) && sa.rmEventHandlers.SchedulerEventHandler != nil {
+				sa.rmEventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMPartitionAppCompleteEvent{

Review comment:
       Send the message from here to the RM directly also




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org