You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by "wilfred-s (via GitHub)" <gi...@apache.org> on 2023/05/24 07:13:39 UTC

[GitHub] [yunikorn-k8shim] wilfred-s commented on a diff in pull request #598: [YUNIKORN-1555] Deduplicate async pod events during recovery

wilfred-s commented on code in PR #598:
URL: https://github.com/apache/yunikorn-k8shim/pull/598#discussion_r1203570985


##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -63,14 +64,18 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
 				return pods[i].CreationTimestamp.Unix() < pods[j].CreationTimestamp.Unix()
 			})
 
+			// Track pods that we have already seen in order to
+			// skip redundant handling of async events in RecoveryDone
+			seenEvents := make(map[types.UID]string)
 			for _, pod := range pods {
 				if utils.NeedRecovery(pod) {
 					app := svc.podEventHandler.HandleEvent(general.AddPod, general.Recovery, pod)
 					recoveringApps[app.GetApplicationID()] = app
 				}
+				seenEvents[pod.UID] = pod.GetResourceVersion()

Review Comment:
   not sure about this, see below



##########
pkg/appmgmt/appmgmt_recovery.go:
##########
@@ -63,14 +64,18 @@ func (svc *AppManagementService) recoverApps() (map[string]interfaces.ManagedApp
 				return pods[i].CreationTimestamp.Unix() < pods[j].CreationTimestamp.Unix()
 			})
 
+			// Track pods that we have already seen in order to
+			// skip redundant handling of async events in RecoveryDone
+			seenEvents := make(map[types.UID]string)

Review Comment:
   `seenEvents` is not the right name I think, should be `seenPod` with the resource version.
   In all other places, like the cache, we also use the converted UID as types.UID is just an alias for string:
   ```
   key := string(pod.UID)
   ```



##########
pkg/appmgmt/general/podevent_handler_test.go:
##########
@@ -73,21 +73,48 @@ func TestHandleAsyncEventWhenNotRecovering(t *testing.T) {
 func TestRecoveryDone(t *testing.T) {
 	amProtocol := cache.NewMockedAMProtocol()
 	podEventHandler := NewPodEventHandler(amProtocol, true)
+
 	pod1 := newPod("pod1")
+	pod2 := newPod("pod2")
+	pod3 := newPod("pod3")
+	pod3v1 := newPodWithResourceVersion(pod3.GetName(), "1")
+	pod3v2 := newPodWithResourceVersion(pod3.GetName(), "2")
+
 	podEventHandler.HandleEvent(AddPod, Informers, pod1)
+	podEventHandler.HandleEvent(AddPod, Informers, pod2)
 	podEventHandler.HandleEvent(DeletePod, Informers, pod1)
+	podEventHandler.HandleEvent(AddPod, Informers, pod3v1)
+	podEventHandler.HandleEvent(UpdatePod, Informers, pod3v2)
 
-	podEventHandler.RecoveryDone()
+	seenEvents := map[types.UID]string{
+		pod2.UID: pod2.GetResourceVersion(), // should not be added
+		pod3.UID: pod3.GetResourceVersion(), // should be updated with new version and ultimately completed
+	}
+	podEventHandler.RecoveryDone(seenEvents)
 
 	assert.Equal(t, len(podEventHandler.asyncEvents), 0)
 	app := amProtocol.GetApplication(appID)
+
 	task, err := app.GetTask("pod1")
 	assert.NilError(t, err)
 	assert.Equal(t, cache.TaskStates().Completed, task.GetTaskState())
+
+	_, err = app.GetTask("pod2")
+	assert.ErrorContains(t, err, "task pod2 doesn't exist in application")
+
+	task, err = app.GetTask("pod3")
+	assert.NilError(t, err)
+	assert.Equal(t, "1", task.GetTaskPod().GetResourceVersion())

Review Comment:
   I think the assumption in the code that a UpdatePod event should transition the pod to finished is not correct. In the handling part of `Manager.updatePod` a check of the current pod and new pod is performed. Then it only transitions the pod if needed by calling the handler.
   
   With VPA (vertical pod auto scaler) there might be more that we need to handle for updates, that is a discussion for later.
   
   I think `PodEventHandler.updatePod` needs to get both pods and the logic of the `Manager.updatePod` needs to move to it. That needs a bit of "surgery" of the PodEventHandler, **BUT** not for as part of this change.



##########
pkg/appmgmt/general/podevent_handler.go:
##########
@@ -99,6 +100,14 @@ func (p *PodEventHandler) RecoveryDone() {
 		log.Logger().Info("Processing async events that arrived during recovery",
 			zap.Int("no. of events", noOfEvents))
 		for _, event := range p.asyncEvents {
+			// Adds get special treatment because most of these events will have
+			// been processed in the initial app recovery routine
+			// If the pod resource version of the seen event matches that of the async event,
+			// this indicates that it is a duplicate object and should be skipped
+			seenVersion, ok := seenEvents[event.pod.UID]
+			if ok && seenVersion == event.pod.GetResourceVersion() && event.eventType == AddPod {

Review Comment:
   Do we need to process an update if the resource version is the same as what we have processed before?
   I think that is why I questioned the location of adding the pod to `seenEvents` above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org