You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by cc...@apache.org on 2023/08/23 15:07:24 UTC

[yunikorn-k8shim] branch master updated: [YUNIKORN-1930] Don't send all node events to the shim (#658)

This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new beed0b24 [YUNIKORN-1930] Don't send all node events to the shim (#658)
beed0b24 is described below

commit beed0b2412b3ccc032772191c7182e4c026064de
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Wed Aug 23 10:07:09 2023 -0500

    [YUNIKORN-1930] Don't send all node events to the shim (#658)
    
    Closes: #658
    
    Signed-off-by: Craig Condit <cc...@apache.org>
---
 pkg/cache/context.go      |  17 ++++++--
 pkg/cache/context_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 118 insertions(+), 7 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index ce401068..3495e725 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -982,6 +982,9 @@ func (ctx *Context) PublishEvents(eventRecords []*si.EventRecord) {
 						zap.Stringer("event", record))
 				}
 			case si.EventRecord_NODE:
+				if !isPublishableNodeEvent(record) {
+					continue
+				}
 				nodeID := record.ObjectID
 				nodeInfo := ctx.schedulerCache.GetNode(nodeID)
 				if nodeInfo == nil {
@@ -999,9 +1002,6 @@ func (ctx *Context) PublishEvents(eventRecords []*si.EventRecord) {
 				}
 				events.GetRecorder().Eventf(node.DeepCopy(), nil,
 					v1.EventTypeNormal, "", "", record.Message)
-			default:
-				log.Log(log.ShimContext).Warn("Unsupported event type, currently only supports to publish request event records",
-					zap.Stringer("type", record.Type))
 			}
 		}
 	}
@@ -1152,6 +1152,17 @@ func (ctx *Context) GetStateDump() (string, error) {
 	return string(bytes), nil
 }
 
+func isPublishableNodeEvent(event *si.EventRecord) bool {
+	// we only send node added & removed event
+	if event.Type == si.EventRecord_NODE &&
+		((event.EventChangeDetail == si.EventRecord_DETAILS_NONE && event.EventChangeType == si.EventRecord_ADD) ||
+			(event.EventChangeDetail == si.EventRecord_NODE_DECOMISSION && event.EventChangeType == si.EventRecord_REMOVE)) {
+		return true
+	}
+
+	return false
+}
+
 // VisibleForTesting
 func (ctx *Context) GetSchedulerCache() *schedulercache.SchedulerCache {
 	return ctx.schedulerCache
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 57a80754..0a3af42d 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -950,18 +950,22 @@ func TestNodeEventPublishedCorrectly(t *testing.T) {
 		},
 	}
 	context.addNode(&node)
+	err := waitForNodeAcceptedEvent(recorder)
+	assert.NilError(t, err, "node accepted event was not sent")
 
 	eventRecords := make([]*si.EventRecord, 0)
 	message := "node_related_message"
 	eventRecords = append(eventRecords, &si.EventRecord{
-		Type:     si.EventRecord_NODE,
-		ObjectID: "host0001",
-		Message:  message,
+		Type:              si.EventRecord_NODE,
+		EventChangeType:   si.EventRecord_ADD,
+		EventChangeDetail: si.EventRecord_DETAILS_NONE,
+		ObjectID:          "host0001",
+		Message:           message,
 	})
 	context.PublishEvents(eventRecords)
 
 	// check that the event has been published
-	err := utils.WaitForCondition(func() bool {
+	err = utils.WaitForCondition(func() bool {
 		for {
 			select {
 			case event := <-recorder.Events:
@@ -977,6 +981,84 @@ func TestNodeEventPublishedCorrectly(t *testing.T) {
 	assert.NilError(t, err, "event should have been emitted")
 }
 
+func TestFilteredEventsNotPublished(t *testing.T) {
+	conf.GetSchedulerConf().SetTestMode(true)
+	recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder)
+	if !ok {
+		t.Fatal("the EventRecorder is expected to be of type FakeRecorder")
+	}
+	context := initContextForTest()
+
+	node := v1.Node{
+		ObjectMeta: apis.ObjectMeta{
+			Name:      "host0001",
+			Namespace: "default",
+			UID:       "uid_0001",
+		},
+	}
+	context.addNode(&node)
+	err := waitForNodeAcceptedEvent(recorder)
+	assert.NilError(t, err, "node accepted event was not sent")
+
+	eventRecords := make([]*si.EventRecord, 7)
+	eventRecords[0] = &si.EventRecord{
+		Type:              si.EventRecord_NODE,
+		EventChangeType:   si.EventRecord_SET,
+		EventChangeDetail: si.EventRecord_NODE_SCHEDULABLE,
+		ObjectID:          "host0001",
+		Message:           "",
+	}
+	eventRecords[1] = &si.EventRecord{
+		Type:              si.EventRecord_NODE,
+		EventChangeType:   si.EventRecord_SET,
+		EventChangeDetail: si.EventRecord_NODE_READY,
+		ObjectID:          "host0001",
+		Message:           "",
+	}
+	eventRecords[2] = &si.EventRecord{
+		Type:              si.EventRecord_NODE,
+		EventChangeType:   si.EventRecord_SET,
+		EventChangeDetail: si.EventRecord_NODE_OCCUPIED,
+		ObjectID:          "host0001",
+		Message:           "",
+	}
+	eventRecords[3] = &si.EventRecord{
+		Type:              si.EventRecord_NODE,
+		EventChangeType:   si.EventRecord_SET,
+		EventChangeDetail: si.EventRecord_NODE_CAPACITY,
+		ObjectID:          "host0001",
+		Message:           "",
+	}
+	eventRecords[4] = &si.EventRecord{
+		Type:              si.EventRecord_NODE,
+		EventChangeType:   si.EventRecord_ADD,
+		EventChangeDetail: si.EventRecord_NODE_ALLOC,
+		ObjectID:          "host0001",
+		Message:           "",
+	}
+	eventRecords[5] = &si.EventRecord{
+		Type:              si.EventRecord_APP,
+		EventChangeType:   si.EventRecord_ADD,
+		EventChangeDetail: si.EventRecord_APP_STARTING,
+		ObjectID:          "app-1",
+		Message:           "",
+	}
+	eventRecords[6] = &si.EventRecord{
+		Type:              si.EventRecord_QUEUE,
+		EventChangeType:   si.EventRecord_ADD,
+		EventChangeDetail: si.EventRecord_DETAILS_NONE,
+		ObjectID:          "root.test",
+		Message:           "",
+	}
+	context.PublishEvents(eventRecords)
+
+	select {
+	case e := <-recorder.Events:
+		t.Errorf("received an unexpected event %s", e)
+	default:
+	}
+}
+
 func TestPublishEventsWithNotExistingAsk(t *testing.T) {
 	conf.GetSchedulerConf().SetTestMode(true)
 	recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder)
@@ -1497,3 +1579,21 @@ func TestCtxUpdatePodCondition(t *testing.T) {
 	updated = context.updatePodCondition(task, &condition)
 	assert.Equal(t, true, updated)
 }
+
+func waitForNodeAcceptedEvent(recorder *k8sEvents.FakeRecorder) error {
+	// fetch the "node accepted" event
+	err := utils.WaitForCondition(func() bool {
+		for {
+			select {
+			case event := <-recorder.Events:
+				log.Log(log.Test).Info(event)
+				if strings.Contains(event, "accepted by the scheduler") {
+					return true
+				}
+			default:
+				return false
+			}
+		}
+	}, 10*time.Millisecond, time.Second)
+	return err
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org