You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by cc...@apache.org on 2024/01/19 18:52:15 UTC
(yunikorn-core) branch master updated: [YUNIKORN-2329] Ensure RMProxy events are processed in-order (#776)
This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new bce75ba3 [YUNIKORN-2329] Ensure RMProxy events are processed in-order (#776)
bce75ba3 is described below
commit bce75ba326840030d34bec5c953db40642cbca8d
Author: Yu-Lin Chen <kh...@gmail.com>
AuthorDate: Fri Jan 19 12:50:10 2024 -0600
[YUNIKORN-2329] Ensure RMProxy events are processed in-order (#776)
Remove the unnecessary goroutine invocations in the UpdateApplication(),
UpdateAllcoation() and UpdateNode() functions, ensuring that events are
passed to the scheduler's HandleEvent() function in the order received.
Closes: #776
Signed-off-by: Craig Condit <cc...@apache.org>
---
pkg/rmproxy/rmproxy.go | 91 ++++++++++++++++++++------------------------------
1 file changed, 37 insertions(+), 54 deletions(-)
diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index c607eea3..ecf06bea 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -314,37 +314,26 @@ func (rmp *RMProxy) UpdateAllocation(request *si.AllocationRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received AllocationRequest, but RmID=\"%s\" not registered", request.RmID)
}
- go func() {
- // Update allocations
- if len(request.Allocations) > 0 {
- for _, alloc := range request.Allocations {
- alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
- }
- }
-
- // Update asks
- if len(request.Asks) > 0 {
- for _, ask := range request.Asks {
- ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
- }
- }
+ // Update allocations
+ for _, alloc := range request.Allocations {
+ alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
+ }
- // Update releases
- if request.Releases != nil {
- if len(request.Releases.AllocationsToRelease) > 0 {
- for _, rel := range request.Releases.AllocationsToRelease {
- rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
- }
- }
+ // Update asks
+ for _, ask := range request.Asks {
+ ask.PartitionName = common.GetNormalizedPartitionName(ask.PartitionName, request.RmID)
+ }
- if len(request.Releases.AllocationAsksToRelease) > 0 {
- for _, rel := range request.Releases.AllocationAsksToRelease {
- rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
- }
- }
+ // Update releases
+ if request.Releases != nil {
+ for _, rel := range request.Releases.AllocationsToRelease {
+ rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
}
- rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
- }()
+ for _, rel := range request.Releases.AllocationAsksToRelease {
+ rel.PartitionName = common.GetNormalizedPartitionName(rel.PartitionName, request.RmID)
+ }
+ }
+ rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateAllocationEvent{Request: request})
return nil
}
@@ -353,21 +342,17 @@ func (rmp *RMProxy) UpdateApplication(request *si.ApplicationRequest) error {
return fmt.Errorf("received ApplicationRequest, but RmID=\"%s\" not registered", request.RmID)
}
- go func() {
- // Update New apps
- if len(request.New) > 0 {
- for _, app := range request.New {
- app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
- }
- }
- // Update Remove apps
- if len(request.Remove) > 0 {
- for _, app := range request.Remove {
- app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
- }
- }
- rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
- }()
+ // Update New apps
+ for _, app := range request.New {
+ app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
+ }
+
+ // Update Remove apps
+ for _, app := range request.Remove {
+ app.PartitionName = common.GetNormalizedPartitionName(app.PartitionName, request.RmID)
+ }
+
+ rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateApplicationEvent{Request: request})
return nil
}
@@ -375,18 +360,16 @@ func (rmp *RMProxy) UpdateNode(request *si.NodeRequest) error {
if rmp.GetResourceManagerCallback(request.RmID) == nil {
return fmt.Errorf("received NodeRequest, but RmID=\"%s\" not registered", request.RmID)
}
- go func() {
- if len(request.Nodes) > 0 {
- for _, node := range request.Nodes {
- if len(node.GetAttributes()) == 0 {
- node.Attributes = map[string]string{}
- }
- partition := node.Attributes[siCommon.NodePartition]
- node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
- }
+
+ for _, node := range request.Nodes {
+ if len(node.GetAttributes()) == 0 {
+ node.Attributes = map[string]string{}
}
- rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
- }()
+ partition := node.Attributes[siCommon.NodePartition]
+ node.Attributes[siCommon.NodePartition] = common.GetNormalizedPartitionName(partition, request.RmID)
+ }
+
+ rmp.EventHandlers.SchedulerEventHandler.HandleEvent(&rmevent.RMUpdateNodeEvent{Request: request})
return nil
}
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org