You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by "wilfred-s (via GitHub)" <gi...@apache.org> on 2023/04/02 02:52:00 UTC

[GitHub] [yunikorn-k8shim] wilfred-s commented on a diff in pull request #565: [YUNIKORN-1670] Make application recovery more robust

wilfred-s commented on code in PR #565:
URL: https://github.com/apache/yunikorn-k8shim/pull/565#discussion_r1155229855


##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -71,33 +71,47 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
 	return recoveringApps, nil
 }
 
-func (svc *AppManagementService) waitForAppRecovery(
-	recoveringApps map[string]interfaces.ManagedApp, maxTimeout time.Duration) error {
-	if len(recoveringApps) > 0 {
-		log.Logger().Info("wait for app recovery",
-			zap.Int("appToRecover", len(recoveringApps)))
-		// check app states periodically, ensure all apps exit from recovering state
-		if err := utils.WaitForCondition(func() bool {
-			for _, app := range recoveringApps {
-				log.Logger().Debug("appInfo",
-					zap.String("appId", app.GetApplicationID()),
-					zap.String("state", app.GetApplicationState()))
-				if app.GetApplicationState() == cache.ApplicationStates().Accepted {
-					delete(recoveringApps, app.GetApplicationID())
-				}
-			}
-
-			if len(recoveringApps) == 0 {
-				log.Logger().Info("app recovery is successful")
-				return true
-			}
-
+func (svc *AppManagementService) waitForAppRecovery(recoveringApps map[string]interfaces.ManagedApp) bool {
+	svc.cancelRecovery.Store(false) // reset cancellation token
+	recoveryStartTime := time.Now()
+	counter := 0
+	for {
+		// check for cancellation token
+		if svc.cancelRecovery.Load() {
+			log.Logger().Info("Waiting for recovery canceled.")
+			svc.cancelRecovery.Store(false)
 			return false
-		}, 1*time.Second, maxTimeout); err != nil {
-			return fmt.Errorf("timeout waiting for app recovery in %s",
-				maxTimeout.String())
 		}
+
+		svc.removeRecoveredApps(recoveringApps)
+		if len(recoveringApps) == 0 {
+			log.Logger().Info("Application recovery complete.")
+			return true
+		}
+		counter++
+		if counter%10 == 0 {
+			log.Logger().Info("Waiting for application recovery",
+				zap.Duration("timeElapsed", time.Since(recoveryStartTime).Round(time.Second)),
+				zap.Int("appsRemaining", len(recoveringApps)))
+		}
+		time.Sleep(1 * time.Second)
 	}
+}
 
-	return nil
+// cancelWaitForAppRecovery is used by testing code to ensure that waitForAppRecovery() does not block forever

Review Comment:
   nit: no () in the comment after the function name



##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -71,33 +71,47 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
 	return recoveringApps, nil
 }
 
-func (svc *AppManagementService) waitForAppRecovery(
-	recoveringApps map[string]interfaces.ManagedApp, maxTimeout time.Duration) error {
-	if len(recoveringApps) > 0 {
-		log.Logger().Info("wait for app recovery",
-			zap.Int("appToRecover", len(recoveringApps)))
-		// check app states periodically, ensure all apps exit from recovering state
-		if err := utils.WaitForCondition(func() bool {
-			for _, app := range recoveringApps {
-				log.Logger().Debug("appInfo",
-					zap.String("appId", app.GetApplicationID()),
-					zap.String("state", app.GetApplicationState()))
-				if app.GetApplicationState() == cache.ApplicationStates().Accepted {
-					delete(recoveringApps, app.GetApplicationID())
-				}
-			}
-
-			if len(recoveringApps) == 0 {
-				log.Logger().Info("app recovery is successful")
-				return true
-			}
-
+func (svc *AppManagementService) waitForAppRecovery(recoveringApps map[string]interfaces.ManagedApp) bool {
+	svc.cancelRecovery.Store(false) // reset cancellation token
+	recoveryStartTime := time.Now()
+	counter := 0
+	for {
+		// check for cancellation token
+		if svc.cancelRecovery.Load() {
+			log.Logger().Info("Waiting for recovery canceled.")
+			svc.cancelRecovery.Store(false)
 			return false
-		}, 1*time.Second, maxTimeout); err != nil {
-			return fmt.Errorf("timeout waiting for app recovery in %s",
-				maxTimeout.String())
 		}
+
+		svc.removeRecoveredApps(recoveringApps)
+		if len(recoveringApps) == 0 {
+			log.Logger().Info("Application recovery complete.")
+			return true
+		}
+		counter++
+		if counter%10 == 0 {
+			log.Logger().Info("Waiting for application recovery",
+				zap.Duration("timeElapsed", time.Since(recoveryStartTime).Round(time.Second)),
+				zap.Int("appsRemaining", len(recoveringApps)))
+		}
+		time.Sleep(1 * time.Second)
 	}
+}
 
-	return nil
+// cancelWaitForAppRecovery is used by testing code to ensure that waitForAppRecovery() does not block forever
+func (svc *AppManagementService) cancelWaitForAppRecovery() {
+	svc.cancelRecovery.Store(true)
+}
+
+// removeRecoveredApps() is used to walk the currently recovering apps list and remove those that have finished recovering

Review Comment:
   nit: no () in the comment after the function name



##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -28,18 +28,18 @@ import (
 	"github.com/apache/yunikorn-k8shim/pkg/appmgmt/general"
 	"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
 	"github.com/apache/yunikorn-k8shim/pkg/cache"
-	"github.com/apache/yunikorn-k8shim/pkg/common/utils"
 	"github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
-func (svc *AppManagementService) WaitForRecovery(maxTimeout time.Duration) error {
+func (svc *AppManagementService) WaitForRecovery() error {

Review Comment:
   Please document the expected behaviour for the function under normal and test case setup.
   It looks like the abort could happen in production but the change explicitly says no timeout for app recovery



##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -71,33 +71,47 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
 	return recoveringApps, nil
 }
 
-func (svc *AppManagementService) waitForAppRecovery(
-	recoveringApps map[string]interfaces.ManagedApp, maxTimeout time.Duration) error {
-	if len(recoveringApps) > 0 {
-		log.Logger().Info("wait for app recovery",
-			zap.Int("appToRecover", len(recoveringApps)))
-		// check app states periodically, ensure all apps exit from recovering state
-		if err := utils.WaitForCondition(func() bool {
-			for _, app := range recoveringApps {
-				log.Logger().Debug("appInfo",
-					zap.String("appId", app.GetApplicationID()),
-					zap.String("state", app.GetApplicationState()))
-				if app.GetApplicationState() == cache.ApplicationStates().Accepted {
-					delete(recoveringApps, app.GetApplicationID())
-				}
-			}
-
-			if len(recoveringApps) == 0 {
-				log.Logger().Info("app recovery is successful")
-				return true
-			}
-
+func (svc *AppManagementService) waitForAppRecovery(recoveringApps map[string]interfaces.ManagedApp) bool {
+	svc.cancelRecovery.Store(false) // reset cancellation token
+	recoveryStartTime := time.Now()
+	counter := 0
+	for {
+		// check for cancellation token
+		if svc.cancelRecovery.Load() {
+			log.Logger().Info("Waiting for recovery canceled.")
+			svc.cancelRecovery.Store(false)

Review Comment:
   why reset? if recovery is cancelled we should never re-run it and a new recovery sets it to false on line 75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org