You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@yunikorn.apache.org by pb...@apache.org on 2023/09/11 15:22:26 UTC

[yunikorn-core] 03/03: [YUNIKORN-1942] Null Batch API Response after buffer size change (#635)

This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch yunikorn-1.3.1
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git

commit 65d329fc4b73d35dcd66c019968e38e8619fb736
Author: Peter Bacsko <pb...@cloudera.com>
AuthorDate: Fri Sep 8 12:56:20 2023 +1000

    [YUNIKORN-1942] Null Batch API Response after buffer size change (#635)
    
    When the ring buffer size is changed the first ID in the buffer needs to
    be updated. Normally the first ID is 0 but after resize this could be
    any random number. That breaks id2pos and causes a nil return in the
    REST call.
    
    Closes: #635
    
    Signed-off-by: Wilfred Spiegelenburg <wi...@apache.org>
    (cherry picked from commit ea422103e51844b598988d42e39e97ca2842df82)
---
 pkg/events/event_ringbuffer.go      | 79 +++++++++++++------------------------
 pkg/events/event_ringbuffer_test.go | 14 +++++--
 pkg/webservice/routes.go            |  2 +-
 3 files changed, 39 insertions(+), 56 deletions(-)

diff --git a/pkg/events/event_ringbuffer.go b/pkg/events/event_ringbuffer.go
index 71ab8cfa..479ea70f 100644
--- a/pkg/events/event_ringbuffer.go
+++ b/pkg/events/event_ringbuffer.go
@@ -35,19 +35,21 @@ type eventRange struct {
 
 // eventRingBuffer A specialized circular buffer to store event objects.
 //
-// Unlike to regular circular buffers, existing entries are never directly removed and new entries can be added if the buffer is full.
+// Unlike regular circular buffers, existing entries are never directly removed
+// and new entries can be added if the buffer is full.
 // In this case, the oldest entry is overwritten and can be collected by the GC.
-// Each event has an ID, however, this mapping is not stored directly. If needed, we calculate the id
-// of the event based on slice positions.
+// Each event has an ID; however, this mapping is not stored directly.
+// If needed, we calculate the id of the event based on slice positions.
 //
-// Retrieving the records can be achieved with GetEventsFromID and GetRecentEntries.
+// Retrieving the records can be achieved with GetEventsFromID.
 type eventRingBuffer struct {
-	events   []*si.EventRecord
-	capacity uint64 // capacity of the buffer
-	head     uint64 // position of the next element (no tail since we don't remove elements)
-	full     bool   // indicates whether the buffer if full - once it is, it stays full unless buffer is resized
-	id       uint64 // unique id of an event record
-	lowestId uint64 // lowest id of an event record available in the buffer at any given time
+	events       []*si.EventRecord
+	capacity     uint64 // capacity of the buffer
+	head         uint64 // position of the next element (no tail since we don't remove elements)
+	full         bool   // indicates whether the buffer if full - once it is, it stays full unless the buffer is resized
+	id           uint64 // unique id of an event record
+	lowestId     uint64 // lowest id of an event record available in the buffer at any given time
+	resizeOffset uint64 // used to aid the calculation of id->pos after resize (see id2pos)
 
 	sync.RWMutex
 }
@@ -62,16 +64,13 @@ func (e *eventRingBuffer) Add(event *si.EventRecord) {
 	if !e.full {
 		e.full = e.head == e.capacity-1
 	} else {
-		// lowest event id updates when new event added to a full buffer
-		log.Log(log.Events).Debug("event buffer full, oldest event will be lost",
-			zap.String("id", strconv.FormatUint(e.lowestId, 10)))
 		e.lowestId++
 	}
 	e.head = (e.head + 1) % e.capacity
 	e.id++
 }
 
-// GetEventsFromID returns "count" number of event records from id if possible. The id can be determined from
+// GetEventsFromID returns "count" number of event records from "id" if possible. The id can be determined from
 // the first call of the method - if it returns nothing because the id is not in the buffer, the lowest valid
 // identifier is returned which can be used to get the first batch.
 // If the caller does not want to pose limit on the number of events returned, "count" must be set to a high
@@ -162,36 +161,18 @@ func (e *eventRingBuffer) getEntriesFromRanges(r1, r2 *eventRange) []*si.EventRe
 }
 
 // id2pos translates the unique event ID to an index in the event slice.
-// If the event is present the position will be returned and the found flag will be true.
-// In the case that the event ID is not present the position returned is 0 and the flag false.
+// If the event is present, the position will be returned and the found flag will be true.
+// If the event ID is not present, the position returned is 0 and the flag is false.
 func (e *eventRingBuffer) id2pos(id uint64) (uint64, bool) {
-	pos := id % e.capacity
-	var calculatedID uint64 // calculated ID based on index values
-	if pos > e.head {
-		diff := pos - e.head
-		calculatedID = e.getLowestID() + diff
-	} else {
-		pId := e.id - 1
-		idAtZero := pId - (pId % e.capacity) // unique id at slice position 0
-		calculatedID = idAtZero + pos
-	}
-
-	if !e.full {
-		if e.head == 0 {
-			// empty
-			return 0, false
-		}
-		if pos >= e.head {
-			// "pos" is not in the [0..head-1] range
-			return 0, false
-		}
+	// id out of range?
+	if id < e.lowestId || id >= e.id {
+		return 0, false
 	}
 
-	if calculatedID != id {
-		return calculatedID, false
-	}
-
-	return pos, true
+	// resizeOffset tells how many elements were "shifted out" after resizing the buffer
+	// eg a buffer with 10 elements is full, then gets resized to 6
+	// the first element at index 0 is no longer 0 or the multiples of 10, but 4, 16, 22, etc.
+	return (id - e.resizeOffset) % e.capacity, true
 }
 
 // getLowestID returns the current lowest available id in the buffer.
@@ -206,7 +187,7 @@ func newEventRingBuffer(capacity uint64) *eventRingBuffer {
 	}
 }
 
-// called from Resize(), This functuin updates the lowest event id available in the buffer
+// called from Resize(), this function updates the lowest event id available in the buffer
 func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
 	// if buffer size is increasing, lowestId stays the same
 	if beginSize < endSize {
@@ -214,7 +195,7 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
 	}
 
 	// bufferSize is shrinking
-	// if number of events is < newSize no change
+	// if the number of events is < newSize, then no change
 	if (e.id - e.getLowestID()) <= endSize {
 		return
 	}
@@ -223,22 +204,18 @@ func (e *eventRingBuffer) updateLowestId(beginSize, endSize uint64) {
 	e.lowestId = e.id - endSize
 }
 
-// resize the existing ring buffer
+// Resize resizes the existing ring buffer
 // this method will be called upon configuration reload
 func (e *eventRingBuffer) Resize(newSize uint64) {
 	e.Lock()
 	defer e.Unlock()
 
 	if newSize == e.capacity {
-		return // Nothing to do if the size is the same
+		return
 	}
 
 	initialSize := e.capacity
-
-	// Create a new buffer with the desired size
 	newEvents := make([]*si.EventRecord, newSize)
-
-	// Determine the number of events to copy
 	var numEventsToCopy uint64
 	if e.id-e.getLowestID() > newSize {
 		numEventsToCopy = newSize
@@ -270,11 +247,9 @@ func (e *eventRingBuffer) Resize(newSize uint64) {
 		copy(newEvents[e.capacity-startIndex:], e.events[:endIndex+1])
 	}
 
-	// Update the buffer's state
 	e.capacity = newSize
 	e.events = newEvents
 	e.head = numEventsToCopy % newSize
-
-	// Update e.full based on whether the buffer is full after resizing
+	e.resizeOffset = e.lowestId
 	e.full = numEventsToCopy == e.capacity
 }
diff --git a/pkg/events/event_ringbuffer_test.go b/pkg/events/event_ringbuffer_test.go
index b155f850..6b390e69 100644
--- a/pkg/events/event_ringbuffer_test.go
+++ b/pkg/events/event_ringbuffer_test.go
@@ -192,6 +192,7 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, uint64(6), ringBuffer.capacity)
 	assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
 	assert.Equal(t, 6, len(ringBuffer.events))
+	assert.Equal(t, uint64(0), ringBuffer.resizeOffset)
 
 	// Test case 2: Resize to a smaller size
 	lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -199,6 +200,7 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, uint64(2), ringBuffer.capacity)
 	assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
 	assert.Equal(t, 2, len(ringBuffer.events))
+	assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
 
 	// Test case 3: Resize to a larger size
 	lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -206,6 +208,7 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, uint64(20), ringBuffer.capacity)
 	assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
 	assert.Equal(t, 20, len(ringBuffer.events))
+	assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
 
 	// Test case 4: Resize when head is at the last element
 	ringBuffer = newEventRingBuffer(5)
@@ -215,6 +218,7 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, uint64(2), ringBuffer.capacity)
 	assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
 	assert.Equal(t, 2, len(ringBuffer.events))
+	assert.Equal(t, uint64(2), ringBuffer.resizeOffset)
 
 	// Test case 5: Resize to events length when head is at the last element
 	ringBuffer = newEventRingBuffer(5)
@@ -225,7 +229,8 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, uint64(4), ringBuffer.capacity)
 	assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
 	assert.Equal(t, 4, len(ringBuffer.events))
-	assert.Equal(t, true, ringBuffer.full)
+	assert.Equal(t, uint64(0), ringBuffer.resizeOffset)
+	assert.Assert(t, ringBuffer.full)
 
 	// Test case 6: Resize when the buffer is full
 	ringBuffer = newEventRingBuffer(10)
@@ -237,6 +242,7 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, 6, len(ringBuffer.events))
 	assert.Equal(t, uint64(0), ringBuffer.head)
 	assert.Equal(t, true, ringBuffer.full)
+	assert.Equal(t, uint64(4), ringBuffer.resizeOffset)
 
 	// Test case 7: Resize when the buffer is overflown (head is wrapped and position > 0)
 	ringBuffer = newEventRingBuffer(10)
@@ -248,7 +254,8 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
 	assert.Equal(t, 8, len(ringBuffer.events))
 	assert.Equal(t, uint64(0), ringBuffer.head)
-	assert.Equal(t, true, ringBuffer.full)
+	assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
+	assert.Assert(t, ringBuffer.full)
 
 	// Test case 8: Test event full : Resize to lower size, followed by resize to a large size
 	ringBuffer = newEventRingBuffer(10)
@@ -258,6 +265,7 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, true, ringBuffer.full)
 	ringBuffer.Resize(6)
 	assert.Equal(t, false, ringBuffer.full)
+	assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
 
 	// Test case 9: Test resize to same size
 	lastEventIdBeforeResize = ringBuffer.GetLastEventID()
@@ -266,7 +274,7 @@ func TestResize(t *testing.T) {
 	assert.Equal(t, lastEventIdBeforeResize, ringBuffer.getLastEventID())
 	assert.Equal(t, 6, len(ringBuffer.events))
 	assert.Equal(t, false, ringBuffer.full)
-
+	assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
 }
 
 func populate(buffer *eventRingBuffer, count int) {
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 632a6dbc..bbe1629c 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -185,7 +185,7 @@ var webRoutes = routes{
 	route{
 		"Scheduler",
 		"GET",
-		"/ws/v1/events/batch/",
+		"/ws/v1/events/batch",
 		getEvents,
 	},
 	// endpoint to retrieve CPU, Memory profiling data,


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@yunikorn.apache.org
For additional commands, e-mail: issues-help@yunikorn.apache.org