You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by GitBox <gi...@apache.org> on 2022/05/05 10:37:26 UTC

[GitHub] [yunikorn-k8shim] wilfred-s commented on a diff in pull request #415: [POC][YUNIKORN-1194] Task objects are not created when applications are restored in recoverApps()

wilfred-s commented on code in PR #415:
URL: https://github.com/apache/yunikorn-k8shim/pull/415#discussion_r864538813


##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -47,27 +47,54 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
 	recoveringApps := make(map[string]interfaces.ManagedApp)
 	for _, mgr := range svc.managers {
 		if m, ok := mgr.(interfaces.Recoverable); ok {
-			appMetas, err := m.ListApplications()
+			appMetas, pods, err := m.ListApplications()
 			if err != nil {
 				log.Logger().Error("failed to list apps", zap.Error(err))
 				return recoveringApps, err
 			}
 
-			// trigger recovery of the apps
-			// this is simply submit the app again
+			// Collect the ManagedApp objects and also call AddApplication()
+			// to register the app
+			log.Logger().Debug("Adding retrieved applications")
+			apps := make([]interfaces.ManagedApp, 0)
 			for _, appMeta := range appMetas {
+				log.Logger().Debug("Adding application", zap.String("appId", appMeta.ApplicationID))
 				if app := svc.amProtocol.AddApplication(
 					&interfaces.AddApplicationRequest{
-						Metadata: appMeta,
+						Metadata:    appMeta,
+						ForceUpdate: true,
 					}); app != nil {
+
 					recoveringApps[app.GetApplicationID()] = app
-					if err := app.TriggerAppRecovery(); err != nil {
-						log.Logger().Error("failed to recover app", zap.Error(err))
-						return recoveringApps, fmt.Errorf("failed to recover app %s, reason: %v",
-							app.GetApplicationID(), err)
+					apps = append(apps, app)
+				}
+			}
+
+			// Collect the tasks and call AddTask()
+			log.Logger().Debug("Adding tasks from retrieved pods")
+			for _, pod := range pods {
+				if taskMeta, ok := svc.generalMgr.GetTaskMetadata(pod); ok {
+					if app := svc.amProtocol.GetApplication(taskMeta.ApplicationID); app != nil {
+						if _, taskErr := app.GetTask(string(pod.UID)); taskErr != nil {
+							log.Logger().Debug("Adding task", zap.Any("taskMeta", taskMeta.TaskID))
+							svc.amProtocol.AddTask(&interfaces.AddTaskRequest{
+								Metadata: taskMeta,
+							})
+						}
 					}
 				}
 			}
+
+			// trigger recovery of the apps
+			// this is simply submit the app again
+			for _, app := range apps {
+				log.Logger().Info("Triggering recovery for app", zap.String("appId", app.GetApplicationID()))
+				if err := app.TriggerAppRecovery(); err != nil {
+					log.Logger().Error("failed to recover app", zap.Error(err))
+					return recoveringApps, fmt.Errorf("failed to recover app %s, reason: %v",

Review Comment:
   This will break out if we fail. Do we need to continue recovery and collect all errors or do we fail completely when this happens.
   Does this leave YuniKorn in a broken state?



##########
pkg/appmgmt/general/general.go:
##########
@@ -90,7 +90,7 @@ func (os *Manager) Stop() {
 	// noop
 }
 
-func (os *Manager) getTaskMetadata(pod *v1.Pod) (interfaces.TaskMetadata, bool) {
+func (os *Manager) GetTaskMetadata(pod *v1.Pod) (interfaces.TaskMetadata, bool) {

Review Comment:
   That is caused by the `interfaces.TaskMetadata`.
   We need to leave it here and look at this whole area from YUNIKORN-1195



##########
pkg/cache/context.go:
##########
@@ -595,7 +595,9 @@ func (ctx *Context) getNamespaceObject(namespace string) *v1.Namespace {
 
 func (ctx *Context) AddApplication(request *interfaces.AddApplicationRequest) interfaces.ManagedApp {
 	log.Logger().Debug("AddApplication", zap.Any("Request", request))
-	if app := ctx.GetApplication(request.Metadata.ApplicationID); app != nil {
+	appID := request.Metadata.ApplicationID
+	if app := ctx.GetApplication(request.Metadata.ApplicationID); app != nil && !request.ForceUpdate {
+		log.Logger().Debug("Application already in the context", zap.String("appID", appID))

Review Comment:
   keep separate, solve this first then modify in follow up for YUNIKORN-1186



##########
pkg/appmgmt/general/general.go:
##########
@@ -324,32 +324,34 @@ func (os *Manager) deletePod(obj interface{}) {
 		zap.String("podName", pod.Name),
 		zap.String("podUID", string(pod.UID)))
 
-	if taskMeta, ok := os.getTaskMetadata(pod); ok {
+	if taskMeta, ok := os.GetTaskMetadata(pod); ok {
 		if app := os.amProtocol.GetApplication(taskMeta.ApplicationID); app != nil {
 			os.amProtocol.NotifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID)
 		}
 	}
 }
 
-func (os *Manager) ListApplications() (map[string]interfaces.ApplicationMetadata, error) {
+func (os *Manager) ListApplications() (map[string]interfaces.ApplicationMetadata, []*v1.Pod, error) {
 	log.Logger().Info("Retrieving pod list")
 	// list all pods on this cluster
 	appPods, err := os.apiProvider.GetAPIs().PodInformer.Lister().List(labels.NewSelector())
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 	log.Logger().Info("Pod list retrieved from api server", zap.Int("nr of pods", len(appPods)))
 	// get existing apps
 	existingApps := make(map[string]interfaces.ApplicationMetadata)
 	podsRecovered := 0
 	podsWithoutMetaData := 0
+	pods := make([]*v1.Pod, 0)
 	for _, pod := range appPods {
 		log.Logger().Debug("Looking at pod for recovery candidates", zap.String("podNamespace", pod.Namespace), zap.String("podName", pod.Name))
 		// general filter passes, and pod is assigned
 		// this means the pod is already scheduled by scheduler for an existing app
 		if utils.GeneralPodFilter(pod) && utils.IsAssignedPod(pod) {
-			if meta, ok := os.getAppMetadata(pod, true); ok {
+			if meta, ok := os.GetAppMetadata(pod, true); ok {
 				podsRecovered++
+				pods = append(pods, pod)

Review Comment:
   Should we track this as part of the app meta data instead of returning a separate list of pods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org