You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by cc...@apache.org on 2024/01/19 18:52:15 UTC

(yunikorn-core) branch master updated: [YUNIKORN-2329] Ensure RMProxy events are processed in-order (#776)

This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new bce75ba3 [YUNIKORN-2329] Ensure RMProxy events are processed in-order (#776)
bce75ba3 is described below

commit bce75ba326840030d34bec5c953db40642cbca8d
Author: Yu-Lin Chen <kh...@gmail.com>
AuthorDate: Fri Jan 19 12:50:10 2024 -0600

    [YUNIKORN-2329] Ensure RMProxy events are processed in-order (#776)
    
    Remove the unnecessary goroutine invocations in the UpdateApplication(),
    UpdateAllcoation() and UpdateNode() functions, ensuring that events are
    passed to the scheduler's HandleEvent() function in the order received.
    
    Closes: #776
    
    Signed-off-by: Craig Condit <cc...@apache.org>
---
 pkg/rmproxy/rmproxy.go | 91 ++++++++++++++++++++------------------------------
 1 file changed, 37 insertions(+), 54 deletions(-)

diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index c607eea3..ecf06bea 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -314,37 +314,26 @@ func (rmp *RMProxy) UpdateAllocation(request *si.AllocationRequest) error {
 	if rmp.GetResourceManagerCallback(request.RmID) == nil {
 		return fmt.Errorf("received AllocationRequest, but RmID=\"%s\" not registered", request.RmID)
 	}
-	go func() {
-		// Update allocations
-		if len(request.Allocations) > 0 {
-			for _, alloc := range request.Allocations {
-				alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
-			}
-		}
-
-		// Update asks
-		if len(request.Asks) > 0 {
-			for _, ask := range request.Asks {
-				ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
-			}
-		}
+	// Update allocations
+	for _, alloc := range request.Allocations {
+		alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
+	}
 
-		// Update releases
-		if request.Releases != nil {
-			if len(request.Releases.AllocationsToRelease) > 0 {
-				for _, rel := range request.Releases.AllocationsToRelease {
-					rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
-				}
-			}
+	// Update asks
+	for _, ask := range request.Asks {
+		ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
+	}
 
-			if len(request.Releases.AllocationAsksToRelease) > 0 {
-				for _, rel := range request.Releases.AllocationAsksToRelease {
-					rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
-				}
-			}
+	// Update releases
+	if request.Releases != nil {
+		for _, rel := range request.Releases.AllocationsToRelease {
+			rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
 		}
-		rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
-	}()
+		for _, rel := range request.Releases.AllocationAsksToRelease {
+			rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
+		}
+	}
+	rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
 	return nil
 }
 
@@ -353,21 +342,17 @@ func (rmp *RMProxy) UpdateApplication(request *si.ApplicationRequest) error {
 		return fmt.Errorf("received ApplicationRequest, but RmID=\"%s\" not registered", request.RmID)
 	}
 
-	go func() {
-		// Update New apps
-		if len(request.New) > 0 {
-			for _, app := range request.New {
-				app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
-			}
-		}
-		// Update Remove apps
-		if len(request.Remove) > 0 {
-			for _, app := range request.Remove {
-				app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
-			}
-		}
-		rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
-	}()
+	// Update New apps
+	for _, app := range request.New {
+		app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
+	}
+
+	// Update Remove apps
+	for _, app := range request.Remove {
+		app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
+	}
+
+	rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
 	return nil
 }
 
@@ -375,18 +360,16 @@ func (rmp *RMProxy) UpdateNode(request *si.NodeRequest) error {
 	if rmp.GetResourceManagerCallback(request.RmID) == nil {
 		return fmt.Errorf("received NodeRequest, but RmID=\"%s\" not registered", request.RmID)
 	}
-	go func() {
-		if len(request.Nodes) > 0 {
-			for _, node := range request.Nodes {
-				if len(node.GetAttributes()) == 0 {
-					node.Attributes = map[string]string{}
-				}
-				partition := node.Attributes[siCommon.NodePartition]
-				node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
-			}
+
+	for _, node := range request.Nodes {
+		if len(node.GetAttributes()) == 0 {
+			node.Attributes = map[string]string{}
 		}
-		rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
-	}()
+		partition := node.Attributes[siCommon.NodePartition]
+		node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
+	}
+
+	rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
 	return nil
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org