You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by cc...@apache.org on 2023/08/23 15:07:24 UTC
[yunikorn-k8shim] branch master updated: [YUNIKORN-1930] Don't send all node events to the shim (#658)
This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new beed0b24 [YUNIKORN-1930] Don't send all node events to the shim (#658)
beed0b24 is described below
commit beed0b2412b3ccc032772191c7182e4c026064de
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Wed Aug 23 10:07:09 2023 -0500
[YUNIKORN-1930] Don't send all node events to the shim (#658)
Closes: #658
Signed-off-by: Craig Condit <cc...@apache.org>
---
pkg/cache/context.go | 17 ++++++--
pkg/cache/context_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 118 insertions(+), 7 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index ce401068..3495e725 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -982,6 +982,9 @@ func (ctx *Context) PublishEvents(eventRecords []*si.EventRecord) {
zap.Stringer("event", record))
}
case si.EventRecord_NODE:
+ if !isPublishableNodeEvent(record) {
+ continue
+ }
nodeID := record.ObjectID
nodeInfo := ctx.schedulerCache.GetNode(nodeID)
if nodeInfo == nil {
@@ -999,9 +1002,6 @@ func (ctx *Context) PublishEvents(eventRecords []*si.EventRecord) {
}
events.GetRecorder().Eventf(node.DeepCopy(), nil,
v1.EventTypeNormal, "", "", record.Message)
- default:
- log.Log(log.ShimContext).Warn("Unsupported event type, currently only supports to publish request event records",
- zap.Stringer("type", record.Type))
}
}
}
@@ -1152,6 +1152,17 @@ func (ctx *Context) GetStateDump() (string, error) {
return string(bytes), nil
}
+func isPublishableNodeEvent(event *si.EventRecord) bool {
+ // we only send node added & removed event
+ if event.Type == si.EventRecord_NODE &&
+ ((event.EventChangeDetail == si.EventRecord_DETAILS_NONE && event.EventChangeType == si.EventRecord_ADD) ||
+ (event.EventChangeDetail == si.EventRecord_NODE_DECOMISSION && event.EventChangeType == si.EventRecord_REMOVE)) {
+ return true
+ }
+
+ return false
+}
+
// VisibleForTesting
func (ctx *Context) GetSchedulerCache() *schedulercache.SchedulerCache {
return ctx.schedulerCache
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 57a80754..0a3af42d 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -950,18 +950,22 @@ func TestNodeEventPublishedCorrectly(t *testing.T) {
},
}
context.addNode(&node)
+ err := waitForNodeAcceptedEvent(recorder)
+ assert.NilError(t, err, "node accepted event was not sent")
eventRecords := make([]*si.EventRecord, 0)
message := "node_related_message"
eventRecords = append(eventRecords, &si.EventRecord{
- Type: si.EventRecord_NODE,
- ObjectID: "host0001",
- Message: message,
+ Type: si.EventRecord_NODE,
+ EventChangeType: si.EventRecord_ADD,
+ EventChangeDetail: si.EventRecord_DETAILS_NONE,
+ ObjectID: "host0001",
+ Message: message,
})
context.PublishEvents(eventRecords)
// check that the event has been published
- err := utils.WaitForCondition(func() bool {
+ err = utils.WaitForCondition(func() bool {
for {
select {
case event := <-recorder.Events:
@@ -977,6 +981,84 @@ func TestNodeEventPublishedCorrectly(t *testing.T) {
assert.NilError(t, err, "event should have been emitted")
}
+func TestFilteredEventsNotPublished(t *testing.T) {
+ conf.GetSchedulerConf().SetTestMode(true)
+ recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder)
+ if !ok {
+ t.Fatal("the EventRecorder is expected to be of type FakeRecorder")
+ }
+ context := initContextForTest()
+
+ node := v1.Node{
+ ObjectMeta: apis.ObjectMeta{
+ Name: "host0001",
+ Namespace: "default",
+ UID: "uid_0001",
+ },
+ }
+ context.addNode(&node)
+ err := waitForNodeAcceptedEvent(recorder)
+ assert.NilError(t, err, "node accepted event was not sent")
+
+ eventRecords := make([]*si.EventRecord, 7)
+ eventRecords[0] = &si.EventRecord{
+ Type: si.EventRecord_NODE,
+ EventChangeType: si.EventRecord_SET,
+ EventChangeDetail: si.EventRecord_NODE_SCHEDULABLE,
+ ObjectID: "host0001",
+ Message: "",
+ }
+ eventRecords[1] = &si.EventRecord{
+ Type: si.EventRecord_NODE,
+ EventChangeType: si.EventRecord_SET,
+ EventChangeDetail: si.EventRecord_NODE_READY,
+ ObjectID: "host0001",
+ Message: "",
+ }
+ eventRecords[2] = &si.EventRecord{
+ Type: si.EventRecord_NODE,
+ EventChangeType: si.EventRecord_SET,
+ EventChangeDetail: si.EventRecord_NODE_OCCUPIED,
+ ObjectID: "host0001",
+ Message: "",
+ }
+ eventRecords[3] = &si.EventRecord{
+ Type: si.EventRecord_NODE,
+ EventChangeType: si.EventRecord_SET,
+ EventChangeDetail: si.EventRecord_NODE_CAPACITY,
+ ObjectID: "host0001",
+ Message: "",
+ }
+ eventRecords[4] = &si.EventRecord{
+ Type: si.EventRecord_NODE,
+ EventChangeType: si.EventRecord_ADD,
+ EventChangeDetail: si.EventRecord_NODE_ALLOC,
+ ObjectID: "host0001",
+ Message: "",
+ }
+ eventRecords[5] = &si.EventRecord{
+ Type: si.EventRecord_APP,
+ EventChangeType: si.EventRecord_ADD,
+ EventChangeDetail: si.EventRecord_APP_STARTING,
+ ObjectID: "app-1",
+ Message: "",
+ }
+ eventRecords[6] = &si.EventRecord{
+ Type: si.EventRecord_QUEUE,
+ EventChangeType: si.EventRecord_ADD,
+ EventChangeDetail: si.EventRecord_DETAILS_NONE,
+ ObjectID: "root.test",
+ Message: "",
+ }
+ context.PublishEvents(eventRecords)
+
+ select {
+ case e := <-recorder.Events:
+ t.Errorf("received an unexpected event %s", e)
+ default:
+ }
+}
+
func TestPublishEventsWithNotExistingAsk(t *testing.T) {
conf.GetSchedulerConf().SetTestMode(true)
recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder)
@@ -1497,3 +1579,21 @@ func TestCtxUpdatePodCondition(t *testing.T) {
updated = context.updatePodCondition(task, &condition)
assert.Equal(t, true, updated)
}
+
+func waitForNodeAcceptedEvent(recorder *k8sEvents.FakeRecorder) error {
+ // fetch the "node accepted" event
+ err := utils.WaitForCondition(func() bool {
+ for {
+ select {
+ case event := <-recorder.Events:
+ log.Log(log.Test).Info(event)
+ if strings.Contains(event, "accepted by the scheduler") {
+ return true
+ }
+ default:
+ return false
+ }
+ }
+ }, 10*time.Millisecond, time.Second)
+ return err
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org