You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by "wilfred-s (via GitHub)" <gi...@apache.org> on 2023/04/02 02:52:00 UTC
[GitHub] [yunikorn-k8shim] wilfred-s commented on a diff in pull request #565: [YUNIKORN-1670] Make application recovery more robust
wilfred-s commented on code in PR #565:
URL: https://github.com/apache/yunikorn-k8shim/pull/565#discussion_r1155229855
##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -71,33 +71,47 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
return recoveringApps, nil
}
-func (svc *AppManagementService) waitForAppRecovery(
- recoveringApps map[string]interfaces.ManagedApp, maxTimeout time.Duration) error {
- if len(recoveringApps) > 0 {
- log.Logger().Info("wait for app recovery",
- zap.Int("appToRecover", len(recoveringApps)))
- // check app states periodically, ensure all apps exit from recovering state
- if err := utils.WaitForCondition(func() bool {
- for _, app := range recoveringApps {
- log.Logger().Debug("appInfo",
- zap.String("appId", app.GetApplicationID()),
- zap.String("state", app.GetApplicationState()))
- if app.GetApplicationState() == cache.ApplicationStates().Accepted {
- delete(recoveringApps, app.GetApplicationID())
- }
- }
-
- if len(recoveringApps) == 0 {
- log.Logger().Info("app recovery is successful")
- return true
- }
-
+func (svc *AppManagementService) waitForAppRecovery(recoveringApps map[string]interfaces.ManagedApp) bool {
+ svc.cancelRecovery.Store(false) // reset cancellation token
+ recoveryStartTime := time.Now()
+ counter := 0
+ for {
+ // check for cancellation token
+ if svc.cancelRecovery.Load() {
+ log.Logger().Info("Waiting for recovery canceled.")
+ svc.cancelRecovery.Store(false)
return false
- }, 1*time.Second, maxTimeout); err != nil {
- return fmt.Errorf("timeout waiting for app recovery in %s",
- maxTimeout.String())
}
+
+ svc.removeRecoveredApps(recoveringApps)
+ if len(recoveringApps) == 0 {
+ log.Logger().Info("Application recovery complete.")
+ return true
+ }
+ counter++
+ if counter%10 == 0 {
+ log.Logger().Info("Waiting for application recovery",
+ zap.Duration("timeElapsed", time.Since(recoveryStartTime).Round(time.Second)),
+ zap.Int("appsRemaining", len(recoveringApps)))
+ }
+ time.Sleep(1 * time.Second)
}
+}
- return nil
+// cancelWaitForAppRecovery is used by testing code to ensure that waitForAppRecovery() does not block forever
Review Comment:
nit: no () in the comment after the function name
##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -71,33 +71,47 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
return recoveringApps, nil
}
-func (svc *AppManagementService) waitForAppRecovery(
- recoveringApps map[string]interfaces.ManagedApp, maxTimeout time.Duration) error {
- if len(recoveringApps) > 0 {
- log.Logger().Info("wait for app recovery",
- zap.Int("appToRecover", len(recoveringApps)))
- // check app states periodically, ensure all apps exit from recovering state
- if err := utils.WaitForCondition(func() bool {
- for _, app := range recoveringApps {
- log.Logger().Debug("appInfo",
- zap.String("appId", app.GetApplicationID()),
- zap.String("state", app.GetApplicationState()))
- if app.GetApplicationState() == cache.ApplicationStates().Accepted {
- delete(recoveringApps, app.GetApplicationID())
- }
- }
-
- if len(recoveringApps) == 0 {
- log.Logger().Info("app recovery is successful")
- return true
- }
-
+func (svc *AppManagementService) waitForAppRecovery(recoveringApps map[string]interfaces.ManagedApp) bool {
+ svc.cancelRecovery.Store(false) // reset cancellation token
+ recoveryStartTime := time.Now()
+ counter := 0
+ for {
+ // check for cancellation token
+ if svc.cancelRecovery.Load() {
+ log.Logger().Info("Waiting for recovery canceled.")
+ svc.cancelRecovery.Store(false)
return false
- }, 1*time.Second, maxTimeout); err != nil {
- return fmt.Errorf("timeout waiting for app recovery in %s",
- maxTimeout.String())
}
+
+ svc.removeRecoveredApps(recoveringApps)
+ if len(recoveringApps) == 0 {
+ log.Logger().Info("Application recovery complete.")
+ return true
+ }
+ counter++
+ if counter%10 == 0 {
+ log.Logger().Info("Waiting for application recovery",
+ zap.Duration("timeElapsed", time.Since(recoveryStartTime).Round(time.Second)),
+ zap.Int("appsRemaining", len(recoveringApps)))
+ }
+ time.Sleep(1 * time.Second)
}
+}
- return nil
+// cancelWaitForAppRecovery is used by testing code to ensure that waitForAppRecovery() does not block forever
+func (svc *AppManagementService) cancelWaitForAppRecovery() {
+ svc.cancelRecovery.Store(true)
+}
+
+// removeRecoveredApps() is used to walk the currently recovering apps list and remove those that have finished recovering
Review Comment:
nit: no () in the comment after the function name
##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -28,18 +28,18 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/appmgmt/general"
"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/yunikorn-k8shim/pkg/cache"
- "github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
-func (svc *AppManagementService) WaitForRecovery(maxTimeout time.Duration) error {
+func (svc *AppManagementService) WaitForRecovery() error {
Review Comment:
Please document the expected behaviour for the function under normal and test case setup.
It looks like the abort could happen in production but the change explicitly says no timeout for app recovery
##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -71,33 +71,47 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
return recoveringApps, nil
}
-func (svc *AppManagementService) waitForAppRecovery(
- recoveringApps map[string]interfaces.ManagedApp, maxTimeout time.Duration) error {
- if len(recoveringApps) > 0 {
- log.Logger().Info("wait for app recovery",
- zap.Int("appToRecover", len(recoveringApps)))
- // check app states periodically, ensure all apps exit from recovering state
- if err := utils.WaitForCondition(func() bool {
- for _, app := range recoveringApps {
- log.Logger().Debug("appInfo",
- zap.String("appId", app.GetApplicationID()),
- zap.String("state", app.GetApplicationState()))
- if app.GetApplicationState() == cache.ApplicationStates().Accepted {
- delete(recoveringApps, app.GetApplicationID())
- }
- }
-
- if len(recoveringApps) == 0 {
- log.Logger().Info("app recovery is successful")
- return true
- }
-
+func (svc *AppManagementService) waitForAppRecovery(recoveringApps map[string]interfaces.ManagedApp) bool {
+ svc.cancelRecovery.Store(false) // reset cancellation token
+ recoveryStartTime := time.Now()
+ counter := 0
+ for {
+ // check for cancellation token
+ if svc.cancelRecovery.Load() {
+ log.Logger().Info("Waiting for recovery canceled.")
+ svc.cancelRecovery.Store(false)
Review Comment:
why reset? if recovery is cancelled we should never re-run it and a new recovery sets it to false on line 75
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org