You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by pb...@apache.org on 2023/09/11 15:22:26 UTC
[yunikorn-core] 03/03: [YUNIKORN-1942] Null Batch API Response after buffer size change (#635)
This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch yunikorn-1.3.1
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
commit 65d329fc4b73d35dcd66c019968e38e8619fb736
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Fri Sep 8 12:56:20 2023 +1000
[YUNIKORN-1942] Null Batch API Response after buffer size change (#635)
When the ring buffer size is changed the first ID in the buffer needs to
be updated. Normally the first ID is 0 but after resize this could be
any random number. That breaks id2pos and causes a nil return in the
REST call.
Closes: #635
Signed-off-by: Wilfred Spiegelenburg <wi...@apache.org>
(cherry picked from commit ea422103e51844b598988d42e39e97ca2842df82)
---
pkg/events/event_ringbuffer.go | 79 +++++++++++++------------------------
pkg/events/event_ringbuffer_test.go | 14 +++++--
pkg/webservice/routes.go | 2 +-
3 files changed, 39 insertions(+), 56 deletions(-)
diff --git a/pkg/events/event_ringbuffer.go b/pkg/events/event_ringbuffer.go
index 71ab8cfa..479ea70f 100644
--- a/pkg/events/event_ringbuffer.go
+++ b/pkg/events/event_ringbuffer.go
@@ -35,19 +35,21 @@ type eventRange struct {
// eventRingBuffer A specialized circular buffer to store event objects.
//
-// Unlike to regular circular buffers, existing entries are never directly removed and new entries can be added if the buffer is full.
+// Unlike regular circular buffers, existing entries are never directly removed
+// and new entries can be added if the buffer is full.
// In this case, the oldest entry is overwritten and can be collected by the GC.
-// Each event has an ID, however, this mapping is not stored directly. If needed, we calculate the id
-// of the event based on slice positions.
+// Each event has an ID; however, this mapping is not stored directly.
+// If needed, we calculate the id of the event based on slice positions.
//
-// Retrieving the records can be achieved with GetEventsFromID and GetRecentEntries.
+// Retrieving the records can be achieved with GetEventsFromID.
type eventRingBuffer struct {
- events []*si.EventRecord
- capacity uint64 // capacity of the buffer
- head uint64 // position of the next element (no tail since we don't remove elements)
- full bool // indicates whether the buffer if full - once it is, it stays full unless buffer is resized
- id uint64 // unique id of an event record
- lowestId uint64 // lowest id of an event record available in the buffer at any given time
+ events []*si.EventRecord
+ capacity uint64 // capacity of the buffer
+ head uint64 // position of the next element (no tail since we don't remove elements)
+ full bool // indicates whether the buffer if full - once it is, it stays full unless the buffer is resized
+ id uint64 // unique id of an event record
+ lowestId uint64 // lowest id of an event record available in the buffer at any given time
+ resizeOffset uint64 // used to aid the calculation of id->pos after resize (see id2pos)
sync.RWMutex
}
@@ -62,16 +64,13 @@ func (e *eventRingBuffer) Add(event *si.EventRecord) {
if !e.full {
e.full = e.head == e.capacity-1
} else {
- // lowest event id updates when new event added to a full buffer
- log.Log(log.Events).Debug("event buffer full, oldest event will be lost",
- zap.String("id", strconv.FormatUint(e.lowestId, 10)))
e.lowestId++
}
e.head = (e.head + 1) % e.capacity
e.id++
}
-// GetEventsFromID returns "count" number of event records from id if possible. The id can be determined from
+// GetEventsFromID returns "count" number of event records from "id" if possible. The id can be determined from
// the first call of the method - if it returns nothing because the id is not in the buffer, the lowest valid
// identifier is returned which can be used to get the first batch.
// If the caller does not want to pose limit on the number of events returned, "count" must be set to a high
@@ -162,36 +161,18 @@ func (e *eventRingBuffer) getEntriesFromRanges(r1, r2 *eventRange) []*si.EventRe
}
// id2pos translates the unique event ID to an index in the event slice.
-// If the event is present the position will be returned and the found flag will be true.
-// In the case that the event ID is not present the position returned is 0 and the flag false.
+// If the event is present, the position will be returned and the found flag will be true.
+// If the event ID is not present, the position returned is 0 and the flag is false.
func (e *eventRingBuffer) id2pos(id uint64) (uint64, bool) {
- pos := id % e.capacity
- var calculatedID uint64 // calculated ID based on index values
- if pos > e.head {
- diff := pos - e.head
- calculatedID = e.getLowestID() + diff
- } else {
- pId := e.id - 1
- idAtZero := pId - (pId % e.capacity) // unique id at slice position 0
- calculatedID = idAtZero + pos
- }
-
- if !e.full {
- if e.head == 0 {
- // empty
- return 0, false
- }
- if pos >= e.head {
- // "pos" is not in the [0..head-1] range
- return 0, false
- }
+ // id out of range?
+ if id < e.lowestId || id >= e.id {
+ return 0, false
}
- if calculatedID != id {
- return calculatedID, false
- }
-
- return pos, true
+ // resizeOffset tells how many elements were "shifted out" after resizing the buffer
+ // eg a buffer with 10 elements is full, then gets resized to 6
+ // the first element at index 0 is no longer 0 or the multiples of 10, but 4, 16, 22, etc.
+ return (id - e.resizeOffset) % e.capacity, true
}
// getLowestID returns the current lowest available id in the buffer.
@@ -206,7 +187,7 @@ func newEventRingBuffer(capacity uint64) *eventRingBuffer {
}
}
-// called from Resize(), This functuin updates the lowest event id available in the buffer
+// called from Resize(), this function updates the lowest event id available in the buffer
func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
// if buffer size is increasing, lowestId stays the same
if beginSize < endSize {
@@ -214,7 +195,7 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
}
// bufferSize is shrinking
- // if number of events is < newSize no change
+ // if the number of events is < newSize, then no change
if (e.id - e.getLowestID()) <= endSize {
return
}
@@ -223,22 +204,18 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
e.lowestId = e.id - endSize
}
-// resize the existing ring buffer
+// Resize resizes the existing ring buffer
// this method will be called upon configuration reload
func (e *eventRingBuffer) Resize(newSize uint64) {
e.Lock()
defer e.Unlock()
if newSize == e.capacity {
- return // Nothing to do if the size is the same
+ return
}
initialSize := e.capacity
-
- // Create a new buffer with the desired size
newEvents := make([]*si.EventRecord, newSize)
-
- // Determine the number of events to copy
var numEventsToCopy uint64
if e.id-e.getLowestID() > newSize {
numEventsToCopy = newSize
@@ -270,11 +247,9 @@ func (e *eventRingBuffer) Resize(newSize uint64) {
copy(newEvents[e.capacity-startIndex:], e.events[:endIndex+1])
}
- // Update the buffer's state
e.capacity = newSize
e.events = newEvents
e.head = numEventsToCopy % newSize
-
- // Update e.full based on whether the buffer is full after resizing
+ e.resizeOffset = e.lowestId
e.full = numEventsToCopy == e.capacity
}
diff --git a/pkg/events/event_ringbuffer_test.go b/pkg/events/event_ringbuffer_test.go
index b155f850..6b390e69 100644
--- a/pkg/events/event_ringbuffer_test.go
+++ b/pkg/events/event_ringbuffer_test.go
@@ -192,6 +192,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(6), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 6, len(ringBuffer.events))
+ assert.Equal(t, uint64(0), ringBuffer.resizeOffset)
// Test case 2: Resize to a smaller size
lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -199,6 +200,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(2), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 2, len(ringBuffer.events))
+ assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
// Test case 3: Resize to a larger size
lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -206,6 +208,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(20), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 20, len(ringBuffer.events))
+ assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
// Test case 4: Resize when head is at the last element
ringBuffer = newEventRingBuffer(5)
@@ -215,6 +218,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(2), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 2, len(ringBuffer.events))
+ assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
// Test case 5: Resize to events length when head is at the last element
ringBuffer = newEventRingBuffer(5)
@@ -225,7 +229,8 @@ func TestResize(t *testing.T) {
assert.Equal(t, uint64(4), ringBuffer.capacity)
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 4, len(ringBuffer.events))
- assert.Equal(t, true, ringBuffer.full)
+ assert.Equal(t, uint64(0), ringBuffer.resizeOffset)
+ assert.Assert(t, ringBuffer.full)
// Test case 6: Resize when the buffer is full
ringBuffer = newEventRingBuffer(10)
@@ -237,6 +242,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, 6, len(ringBuffer.events))
assert.Equal(t, uint64(0), ringBuffer.head)
assert.Equal(t, true, ringBuffer.full)
+ assert.Equal(t, uint64(4), ringBuffer.resizeOffset)
// Test case 7: Resize when the buffer is overflown (head is wrapped and position > 0)
ringBuffer = newEventRingBuffer(10)
@@ -248,7 +254,8 @@ func TestResize(t *testing.T) {
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 8, len(ringBuffer.events))
assert.Equal(t, uint64(0), ringBuffer.head)
- assert.Equal(t, true, ringBuffer.full)
+ assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
+ assert.Assert(t, ringBuffer.full)
// Test case 8: Test event full : Resize to lower size, followed by resize to a large size
ringBuffer = newEventRingBuffer(10)
@@ -258,6 +265,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, true, ringBuffer.full)
ringBuffer.Resize(6)
assert.Equal(t, false, ringBuffer.full)
+ assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
// Test case 9: Test resize to same size
lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -266,7 +274,7 @@ func TestResize(t *testing.T) {
assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
assert.Equal(t, 6, len(ringBuffer.events))
assert.Equal(t, false, ringBuffer.full)
-
+ assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
}
func populate(buffer *eventRingBuffer, count int) {
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 632a6dbc..bbe1629c 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -185,7 +185,7 @@ var webRoutes = routes{
route{
"Scheduler",
"GET",
- "/ws/v1/events/batch/",
+ "/ws/v1/events/batch",
getEvents,
},
// endpoint to retrieve CPU, Memory profiling data,
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org