You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@yunikorn.apache.org by "pbacsko (via GitHub)" <gi...@apache.org> on 2023/04/28 11:08:53 UTC

[GitHub] [yunikorn-core] pbacsko opened a new pull request, #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

pbacsko opened a new pull request, #533:
URL: https://github.com/apache/yunikorn-core/pull/533

   ### What is this PR for?
   Store events separately for event streaming.
   Implement client interface to request stream of events.
   
   ### What type of PR is it?
   * [ ] - Bug Fix
   * [ ] - Improvement
   * [x] - Feature
   * [ ] - Documentation
   * [ ] - Hot Fix
   * [ ] - Refactoring
   
   ### Todos
   * [ ] - Task
   
   ### What is the Jira issue?
   https://issues.apache.org/jira/browse/YUNIKORN-1709
   
   ### How should this be tested?
   
   ### Screenshots (if appropriate)
   
   ### Questions:
   * [ ] - The licenses files need update.
   * [ ] - There is breaking changes for older versions.
   * [ ] - It needs documentation.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409272980


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {

Review Comment:
    Yeah, we can have a separate service, but we can let the service as a cusomer here, so we don't need to add another publisher, just one publisher for the streaming, and if we have several  publishers, it seems will cost more overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181641754


##########
pkg/events/event_ringbuffer.go:
##########
@@ -0,0 +1,175 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"time"
+
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const latestUnset int64 = -1 << 63
+
+var now = time.Now
+
+// EventRingBuffer A specialized circular buffer to store event objects.
+//
+// Unlike to regular circular buffers, new entries can be added if the buffer is full. In this case,
+// the oldest entry is overwritten. This is not a classic enqueue operation, so it's named differently.
+//
+// Retrieving the records can be achieved with GetLatestEntriesCount and GetLatestEntries. Since these do not
+// remove the elements, they are not regular dequeue operations either.
+//
+// Entries have a maximum lifespan defined in nanoseconds. Cleanup of expired objects occurs when a call to
+// RemoveExpiredEntries is made.
+//
+// The buffer does not use locking, proper synchronization is delegated to the client.
+type EventRingBuffer struct {
+	events        []*si.EventRecord

Review Comment:
   IMO we need to switch to `[]si.EventRecord` if we want to store a lot of elements. We want to avoid GC interactions with pointers, so using a value type here is very much reasonable.
   
   In fact, I'm considering a special, internal type here instead of `EventRecord` with interned strings. There will be a lot of string duplications in the events (especially IDs). 
   This library looks promising: https://github.com/philpearl/intern
   
   We can do this in https://issues.apache.org/jira/browse/YUNIKORN-1713.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181658629


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,192 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+type EventStreaming struct {
+	buffer       *EventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[EventStream]consumerDetails
+	sync.Mutex
+}
+
+type consumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream the type returned to the client who wants to capture the stream of events.
+type EventStream struct {
+	events <-chan si.EventRecord
+}
+
+type EventHistoryOpts struct {
+	interval   time.Duration
+	eventCount int
+}
+
+// PublishEvent publishes an event to event stream consumers. Events are first stored in a ring buffer,
+// then all clients are notified. Clients whose buffer is full are considered slow and removed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	e.buffer.Add(event)
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Logger().Warn("listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+// When a consumer is finished, it is supposed to call RemoveEventStream to free up resources.
+// Consumers have an arbitrary name for logging purposes. The "opts" argument can be used to retrieve historical
+// entries.
+func (e *EventStreaming) CreateEventStream(name string, opts EventHistoryOpts) EventStream {
+	consumer := make(chan si.EventRecord, defaultChannelBufSize)
+	stream := EventStream{
+		events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	history := e.createEventStreamInternal(stream, local, consumer, stop, name, opts)
+
+	go func(consumer chan<- si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {

Review Comment:
   So the idea here is this: first we need to send event history which cannot interfere with new events. So we send out the history, while new events are collected in the channel buffer of `local`. Then we continue bridging.
   
   It's possible to avoid this extra layer of indirection, but we need to add states (Processing history / can send) and have to collect new events in a slice. When we're done sending historical events, we need to send out collected events and processing of new events cannot continue (like in `PodEventHandler.RecoveryDone()`). I don't like this because detecting slow consumers is more difficult: after sending out the history, `len(consumer)` might be close to full. Then we start sending the events from the temporary slice, it gets full, but we can't tell if it's because we're too fast or the reciver side is slow. 
   
   This wouldn't be an issue if we didn't support sending past events, but there you go.
   So to me, this extra bridging is much more convenient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1481741123


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
+	consumer := make(chan *si.EventRecord, defaultChannelBufSize)
+	stream := &EventStream{
+		Events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+	history := e.buffer.GetRecentEvents(count)
+
+	go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
+		// Store the refs of historical events; it's possible that some events are added to the
+		// ring buffer and also to "local" channel.
+		// It is because we use two separate locks, so event updates are not atomic.
+		// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
+		// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
+		// map, so "local" will also contain the new event.
+		seen := make(map[*si.EventRecord]bool)
+		for _, event := range history {
+			consumer <- event
+			seen[event] = true
+		}
+		for {
+			select {
+			case <-e.stopCh:
+				close(consumer)
+				return
+			case <-stop:
+				close(consumer)
+				return
+			case event := <-local:
+				if seen[event] {
+					continue
+				}
+				// since events are processed in a single goroutine, doubling is no longer
+				// possible at this point
+				seen = make(map[*si.EventRecord]bool)
+				consumer <- event
+			}
+		}
+	}(consumer, local, stop)
+
+	log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
+	return stream
+}
+
+func (e *EventStreaming) createEventStreamInternal(stream *EventStream,
+	local chan *si.EventRecord,
+	consumer chan *si.EventRecord,
+	stop chan struct{},
+	name string,
+	count uint64) {

Review Comment:
   this parameter is unused. Is it a potential bug or we can just remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409242383


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {

Review Comment:
   Minor question, if we can set custom defaultChannelBufSize or dynamic range for the size, here 100 seems small for some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409252511


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {

Review Comment:
   Yes 100 might be a bit too low. 1000 sounds like a reasonable value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1410492780


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko closed pull request #533: [YUNIKORN-1709] Add event streaming logic
URL: https://github.com/apache/yunikorn-core/pull/533


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409296986


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {

Review Comment:
   Yes, a separate service will use the REST API (either batch or streaming, whatever is more convenient), just like everyone else. If this is what you mean, I agree with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1410212498


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {

Review Comment:
   Agree , 1000 seems more reasonable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1412072876


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer.

Review Comment:
   Actually "0" means no events. But extended the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "wilfred-s (via GitHub)" <gi...@apache.org>.
wilfred-s commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1407184297


##########
pkg/webservice/handlers.go:
##########
@@ -918,3 +918,61 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getStream(w http.ResponseWriter, r *http.Request) {

Review Comment:
   We should limit the number of streams that we allow to be open at the same time here. Even if it is some "high" number but we need to be able to turn down new requests as a safety mechanism.



##########
pkg/webservice/webservice.go:
##########
@@ -63,12 +64,13 @@ func loggingHandler(inner http.Handler, name string) http.HandlerFunc {
 // TODO we need the port to be configurable
 func (m *WebService) StartWebApp() {
 	router := newRouter()
+	// Important: do not use ReadTimeout, WriteTimeout or IdleTimeout because those can break the event streaming

Review Comment:
   This is a potential security issue. We need to be able to protect ourselves from a Slowloris attack: YUNIKORN-2182
   
   Since we are using go 1.20, or later, we should be looking at https://github.com/golang/go/issues/54136 anyway to implement timeouts



##########
pkg/scheduler/objects/common_test.go:
##########


Review Comment:
   not sure why these changes are needed.



##########
pkg/webservice/handlers.go:
##########
@@ -918,3 +918,61 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getStream(w http.ResponseWriter, r *http.Request) {
+	writeHeaders(w)
+	eventSystem := events.GetEventSystem()
+	if !eventSystem.IsEventTrackingEnabled() {
+		buildJSONErrorResponse(w, "Event tracking is disabled", http.StatusInternalServerError)
+		return
+	}
+
+	f, ok := w.(http.Flusher)
+	if !ok {
+		buildJSONErrorResponse(w, "Writer does not implement http.Flusher", http.StatusInternalServerError)
+		return
+	}
+
+	var count uint64
+	if countStr := r.URL.Query().Get("count"); countStr != "" {
+		var err error
+		count, err = strconv.ParseUint(countStr, 10, 64)
+		if err != nil {
+			buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+	}
+
+	enc := json.NewEncoder(w)
+	stream := eventSystem.CreateEventStream(r.Host, count)
+
+	// Reading events in an infinite loop until either the client disconnects or Yunikorn closes the channel.
+	// This results in a persistent HTTP connection where the message body is never closed.
+	// We don't use timeouts and since HTTP 1.1 clients are expected to handle persistent connections by default.

Review Comment:
   See comment earlier, we should use the ResponseController with a sliding  write deadline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#issuecomment-1847317403

   cc @wilfred-s 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181658629


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,192 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+type EventStreaming struct {
+	buffer       *EventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[EventStream]consumerDetails
+	sync.Mutex
+}
+
+type consumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream the type returned to the client who wants to capture the stream of events.
+type EventStream struct {
+	events <-chan si.EventRecord
+}
+
+type EventHistoryOpts struct {
+	interval   time.Duration
+	eventCount int
+}
+
+// PublishEvent publishes an event to event stream consumers. Events are first stored in a ring buffer,
+// then all clients are notified. Clients whose buffer is full are considered slow and removed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	e.buffer.Add(event)
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Logger().Warn("listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+// When a consumer is finished, it is supposed to call RemoveEventStream to free up resources.
+// Consumers have an arbitrary name for logging purposes. The "opts" argument can be used to retrieve historical
+// entries.
+func (e *EventStreaming) CreateEventStream(name string, opts EventHistoryOpts) EventStream {
+	consumer := make(chan si.EventRecord, defaultChannelBufSize)
+	stream := EventStream{
+		events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	history := e.createEventStreamInternal(stream, local, consumer, stop, name, opts)
+
+	go func(consumer chan<- si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {

Review Comment:
   So the idea here is this: first we need to send event history which cannot interfere with new events. So first we send out the history, while new events are collected in the channel buffer of `local`. Then we continue bridging.
   
   It's possible to avoid this extra layer of indirection, but we need to add states (Processing history / can send) and have to collect new events in a slice. When we're done sending historical events, we need to send out collected events and processing of new events cannot continue (like in `PodEventHandler.RecoveryDone()`). I don't like this because detecting slow consumers is more difficult: after sending out the history, `len(consumer)` might be close to full. Then we start sending the events from the temporary slice, it gets full, but we can't tell if it's because we're too fast or the reciver side is slow. 
   
   This wouldn't be an issue if we didn't support sending past events, but there you go.
   So to me, this extra bridging is much more convenient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1412095936


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {

Review Comment:
   Hi @pbacsko , i am thinking if we can have an option can include all history events without setting any count, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1412072367


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
+	consumer := make(chan *si.EventRecord, defaultChannelBufSize)
+	stream := &EventStream{
+		Events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	history := e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+
+	go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
+		// Store the refs of historical events; it's possible that some events are added to the
+		// ring buffer and also to "local" channel.
+		// It is because we use two separate locks, so event updates are not atomic.
+		// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
+		// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
+		// map, so "local" will also contain the new event.
+		seen := make(map[*si.EventRecord]bool)
+		for _, event := range history {
+			consumer <- event
+			seen[event] = true
+		}
+		for {
+			select {
+			case <-e.stopCh:
+				close(consumer)
+				return
+			case <-stop:
+				close(consumer)
+				return
+			case event := <-local:
+				if seen[event] {
+					continue
+				}
+				// since events are processed in a single goroutine, doubling is no longer
+				// possible at this point
+				seen = make(map[*si.EventRecord]bool)
+				consumer <- event
+			}
+		}
+	}(consumer, local, stop)
+
+	log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
+	return stream
+}
+
+func (e *EventStreaming) createEventStreamInternal(stream *EventStream,
+	local chan *si.EventRecord,
+	consumer chan *si.EventRecord,
+	stop chan struct{},
+	name string,
+	count uint64) []*si.EventRecord {
+	// stuff that needs locking
+	e.Lock()
+	defer e.Unlock()
+
+	e.eventStreams[stream] = eventConsumerDetails{
+		local:     local,
+		consumer:  consumer,
+		stopCh:    stop,
+		name:      name,
+		createdAt: time.Now(),
+	}
+
+	return e.buffer.GetRecentEvents(count)
+}
+
+// RemoveEventStream stops the streaming for a given consumer. Must be called to avoid resource leaks.
+func (e *EventStreaming) RemoveEventStream(consumer *EventStream) {
+	e.Lock()
+	defer e.Unlock()
+
+	e.removeEventStream(consumer)
+}
+
+func (e *EventStreaming) removeEventStream(consumer *EventStream) {
+	if details, ok := e.eventStreams[consumer]; ok {
+		log.Log(log.Events).Info("Removing event stream consumer", zap.String("name", details.name),

Review Comment:
   The external reason why it was closed is logged at the appropriate place in the handler. Here, the code doesn't have to know about it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1412071701


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
+	consumer := make(chan *si.EventRecord, defaultChannelBufSize)
+	stream := &EventStream{
+		Events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	history := e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+
+	go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
+		// Store the refs of historical events; it's possible that some events are added to the
+		// ring buffer and also to "local" channel.
+		// It is because we use two separate locks, so event updates are not atomic.
+		// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
+		// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
+		// map, so "local" will also contain the new event.
+		seen := make(map[*si.EventRecord]bool)
+		for _, event := range history {
+			consumer <- event
+			seen[event] = true
+		}
+		for {
+			select {
+			case <-e.stopCh:
+				close(consumer)
+				return
+			case <-stop:
+				close(consumer)
+				return
+			case event := <-local:
+				if seen[event] {
+					continue
+				}
+				// since events are processed in a single goroutine, doubling is no longer
+				// possible at this point
+				seen = make(map[*si.EventRecord]bool)
+				consumer <- event
+			}
+		}
+	}(consumer, local, stop)
+
+	log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
+	return stream
+}
+
+func (e *EventStreaming) createEventStreamInternal(stream *EventStream,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "wilfred-s (via GitHub)" <gi...@apache.org>.
wilfred-s commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1464323732


##########
pkg/events/event_system.go:
##########
@@ -191,11 +245,21 @@ func (ec *EventSystemImpl) isRestartNeeded() bool {
 	return ec.readIsTrackingEnabled() != ec.trackingEnabled
 }
 
+// Restart restarts the event system, used during config update.
 func (ec *EventSystemImpl) Restart() {
 	ec.Stop()
 	ec.StartServiceWithPublisher(true)
 }
 
+// VisibleForTesting

Review Comment:
   fix comment to use standard go layout as it is an.exported function now.



##########
pkg/events/event_system.go:
##########
@@ -94,21 +141,25 @@ func (ec *EventSystemImpl) GetRingBufferCapacity() uint64 {
 // VisibleForTesting
 func Init() {
 	store := newEventStore()
+	buffer := newEventRingBuffer(defaultRingBufferSize)
 	ev = &EventSystemImpl{
 		Store:         store,
 		channel:       make(chan *si.EventRecord, defaultEventChannelSize),
 		stop:          make(chan bool),
 		stopped:       false,
 		publisher:     CreateShimPublisher(store),
-		eventBuffer:   newEventRingBuffer(defaultRingBufferSize),
+		eventBuffer:   buffer,
 		eventSystemId: fmt.Sprintf("event-system-%d", time.Now().Unix()),
+		streaming:     NewEventStreaming(buffer),
 	}
 }
 
+// StartService starts the event processing in the background. See the interface for details.
 func (ec *EventSystemImpl) StartService() {
 	ec.StartServiceWithPublisher(true)
 }
 
+// VisibleForTesting

Review Comment:
   fix comment to use standard go layout as it is an.exported function now.



##########
pkg/events/event_ringbuffer.go:
##########
@@ -70,14 +70,38 @@ func (e *eventRingBuffer) Add(event *si.EventRecord) {
 	e.id++
 }
 
-// GetEventsFromID returns "count" number of event records from "id" if possible. The id can be determined from
+func (e *eventRingBuffer) GetRecentEvents(count uint64) []*si.EventRecord {

Review Comment:
   add comment use standard go layout as it is an.exported function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] codecov[bot] commented on pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#issuecomment-1527408886

   ## [Codecov](https://codecov.io/gh/apache/yunikorn-core/pull/533?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#533](https://codecov.io/gh/apache/yunikorn-core/pull/533?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6757902) into [master](https://codecov.io/gh/apache/yunikorn-core/commit/cec575e4b8989eac158976320d6506ff7c486baf?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (cec575e) will **decrease** coverage by `0.24%`.
   > The diff coverage is `61.05%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #533      +/-   ##
   ==========================================
   - Coverage   75.05%   74.81%   -0.24%     
   ==========================================
     Files          70       72       +2     
     Lines       11323    11510     +187     
   ==========================================
   + Hits         8498     8611     +113     
   - Misses       2528     2598      +70     
   - Partials      297      301       +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/yunikorn-core/pull/533?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [pkg/events/event\_streaming.go](https://codecov.io/gh/apache/yunikorn-core/pull/533?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL2V2ZW50cy9ldmVudF9zdHJlYW1pbmcuZ28=) | `29.54% <29.54%> (ø)` | |
   | [pkg/events/event\_ringbuffer.go](https://codecov.io/gh/apache/yunikorn-core/pull/533?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL2V2ZW50cy9ldmVudF9yaW5nYnVmZmVyLmdv) | `87.50% <87.50%> (ø)` | |
   | [pkg/events/event\_cache.go](https://codecov.io/gh/apache/yunikorn-core/pull/533?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGtnL2V2ZW50cy9ldmVudF9jYWNoZS5nbw==) | `93.61% <100.00%> (+0.43%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1412109748


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {

Review Comment:
   Sure, it's a solution, agree!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1411754419


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer.

Review Comment:
   Nit:
   
   Add more comments about the count, when we want to get all the historical events from the ring buffer, we can set count to 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1412100072


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {

Review Comment:
   We'll see, let's talk about in a different JIRA. Although it's possible even now, because you just have to use math.MaxUint64 and that does the job. Numbers that are too high isn't an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1407387393


##########
pkg/scheduler/objects/common_test.go:
##########


Review Comment:
   Interfaces changes in EventSystem (two new methods).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#issuecomment-1798658787

   Note: we can hold off merging until 1.4.0 branching is ready. This is intended for Yunikorn 1.5.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#issuecomment-1807955295

   TODO:
   - http status codes on return & error object
   - idle timeout
   - coverage


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "wilfred-s (via GitHub)" <gi...@apache.org>.
wilfred-s commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1482258739


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
+	consumer := make(chan *si.EventRecord, defaultChannelBufSize)
+	stream := &EventStream{
+		Events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+	history := e.buffer.GetRecentEvents(count)
+
+	go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
+		// Store the refs of historical events; it's possible that some events are added to the
+		// ring buffer and also to "local" channel.
+		// It is because we use two separate locks, so event updates are not atomic.
+		// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
+		// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
+		// map, so "local" will also contain the new event.
+		seen := make(map[*si.EventRecord]bool)
+		for _, event := range history {
+			consumer <- event
+			seen[event] = true
+		}
+		for {
+			select {
+			case <-e.stopCh:
+				close(consumer)
+				return
+			case <-stop:
+				close(consumer)
+				return
+			case event := <-local:
+				if seen[event] {
+					continue
+				}
+				// since events are processed in a single goroutine, doubling is no longer
+				// possible at this point
+				seen = make(map[*si.EventRecord]bool)
+				consumer <- event
+			}
+		}
+	}(consumer, local, stop)
+
+	log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
+	return stream
+}
+
+func (e *EventStreaming) createEventStreamInternal(stream *EventStream,
+	local chan *si.EventRecord,
+	consumer chan *si.EventRecord,
+	stop chan struct{},
+	name string,
+	count uint64) {

Review Comment:
   Need a follow up jira to clean that up @pbacsko 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1482870932


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
+	consumer := make(chan *si.EventRecord, defaultChannelBufSize)
+	stream := &EventStream{
+		Events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+	history := e.buffer.GetRecentEvents(count)
+
+	go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
+		// Store the refs of historical events; it's possible that some events are added to the
+		// ring buffer and also to "local" channel.
+		// It is because we use two separate locks, so event updates are not atomic.
+		// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
+		// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
+		// map, so "local" will also contain the new event.
+		seen := make(map[*si.EventRecord]bool)
+		for _, event := range history {
+			consumer <- event
+			seen[event] = true
+		}
+		for {
+			select {
+			case <-e.stopCh:
+				close(consumer)
+				return
+			case <-stop:
+				close(consumer)
+				return
+			case event := <-local:
+				if seen[event] {
+					continue
+				}
+				// since events are processed in a single goroutine, doubling is no longer
+				// possible at this point
+				seen = make(map[*si.EventRecord]bool)
+				consumer <- event
+			}
+		}
+	}(consumer, local, stop)
+
+	log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
+	return stream
+}
+
+func (e *EventStreaming) createEventStreamInternal(stream *EventStream,
+	local chan *si.EventRecord,
+	consumer chan *si.EventRecord,
+	stop chan struct{},
+	name string,
+	count uint64) {

Review Comment:
   Correct. I actually have a list of necessary small code changes which are unrelated to each other, I'm adding this to the list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409245366


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {

Review Comment:
   @wilfred-s @pbacsko I think the customer here is a good place for us to set a internal customer, the new trackingService, we can at least use it to streaming resourcce and aggregated tracking events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409245366


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {

Review Comment:
   @wilfred-s @pbacsko I think the customer here is a good place for us to set a internal customer, the new trackingService, we can at least use it to streaming resourcce related events and aggregated tracking events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409257636


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {

Review Comment:
   If we decided to this internally, we might as well just run the aggregation on a separate goroutine. 
   
   Wilfred's comments:
   "I think we can simply use the existing events and push the whole aggregation out of YuniKorn. The only thing that seems to be missing is the node instance type. If that is available just listening to the app and node events would allow creating the summary outside of the scheduler.
   [...]
   That is the best option. A scheduler should schedule. It is not for providing statistics."
   
   To me it sounds like a separate application.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181641754


##########
pkg/events/event_ringbuffer.go:
##########
@@ -0,0 +1,175 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"time"
+
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const latestUnset int64 = -1 << 63
+
+var now = time.Now
+
+// EventRingBuffer A specialized circular buffer to store event objects.
+//
+// Unlike to regular circular buffers, new entries can be added if the buffer is full. In this case,
+// the oldest entry is overwritten. This is not a classic enqueue operation, so it's named differently.
+//
+// Retrieving the records can be achieved with GetLatestEntriesCount and GetLatestEntries. Since these do not
+// remove the elements, they are not regular dequeue operations either.
+//
+// Entries have a maximum lifespan defined in nanoseconds. Cleanup of expired objects occurs when a call to
+// RemoveExpiredEntries is made.
+//
+// The buffer does not use locking, proper synchronization is delegated to the client.
+type EventRingBuffer struct {
+	events        []*si.EventRecord

Review Comment:
   IMO we need to switch to `[]si.EventRecord` if we want to store a lot of elements. We want to avoid GC interactions, so using a value type here is very much reasonable.
   
   In fact, I'm considering a special, internal type here instead of `EventRecord` with interned strings. There will be a lot of string duplications in the events (especially IDs). 
   This library looks promising: https://github.com/philpearl/intern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181641754


##########
pkg/events/event_ringbuffer.go:
##########
@@ -0,0 +1,175 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"time"
+
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const latestUnset int64 = -1 << 63
+
+var now = time.Now
+
+// EventRingBuffer A specialized circular buffer to store event objects.
+//
+// Unlike to regular circular buffers, new entries can be added if the buffer is full. In this case,
+// the oldest entry is overwritten. This is not a classic enqueue operation, so it's named differently.
+//
+// Retrieving the records can be achieved with GetLatestEntriesCount and GetLatestEntries. Since these do not
+// remove the elements, they are not regular dequeue operations either.
+//
+// Entries have a maximum lifespan defined in nanoseconds. Cleanup of expired objects occurs when a call to
+// RemoveExpiredEntries is made.
+//
+// The buffer does not use locking, proper synchronization is delegated to the client.
+type EventRingBuffer struct {
+	events        []*si.EventRecord

Review Comment:
   IMO we need to switch to `[]si.EventRecord` if we want to store a lot of elements. We want to avoid GC interactions, so using a value type here is very much reasonable.
   
   In fact, I'm considering a specia, internal type here instead of `EventRecord` with interned strings. There will be a lot of string duplications in the events (especially IDs). 
   This library looks promising: https://github.com/philpearl/intern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181649634


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,192 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+type EventStreaming struct {
+	buffer       *EventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[EventStream]consumerDetails
+	sync.Mutex
+}
+
+type consumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream the type returned to the client who wants to capture the stream of events.
+type EventStream struct {
+	events <-chan si.EventRecord
+}
+
+type EventHistoryOpts struct {
+	interval   time.Duration
+	eventCount int
+}
+
+// PublishEvent publishes an event to event stream consumers. Events are first stored in a ring buffer,
+// then all clients are notified. Clients whose buffer is full are considered slow and removed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	e.buffer.Add(event)
+	for consumer, details := range e.eventStreams {

Review Comment:
   We can add another level of channel communication here, so updating clients will occur on a separate goroutine. So this method can look like:
   
   ```
   func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
   	e.Lock()
   	defer e.Unlock()
   	e.buffer.Add(event)
           e.events <- event
   }
   ```
   
   So the event flow will be:
   incoming event --> `e.events` --> `consumerDetails.local` --> `consumerDetails.consumer`.
   
   It's a matter of taste, this method is already fast and not affected by slow clients.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [YUNIKORN-1709] Add event streaming logic

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181641754


##########
pkg/events/event_ringbuffer.go:
##########
@@ -0,0 +1,175 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"time"
+
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const latestUnset int64 = -1 << 63
+
+var now = time.Now
+
+// EventRingBuffer A specialized circular buffer to store event objects.
+//
+// Unlike to regular circular buffers, new entries can be added if the buffer is full. In this case,
+// the oldest entry is overwritten. This is not a classic enqueue operation, so it's named differently.
+//
+// Retrieving the records can be achieved with GetLatestEntriesCount and GetLatestEntries. Since these do not
+// remove the elements, they are not regular dequeue operations either.
+//
+// Entries have a maximum lifespan defined in nanoseconds. Cleanup of expired objects occurs when a call to
+// RemoveExpiredEntries is made.
+//
+// The buffer does not use locking, proper synchronization is delegated to the client.
+type EventRingBuffer struct {
+	events        []*si.EventRecord

Review Comment:
   IMO we need to switch to `[]si.EventRecord` if we want to store a lot of elements. We want to avoid GC interactions with pointers, so using a value type here is very much reasonable.
   
   In fact, I'm considering a special, internal type here instead of `EventRecord` with interned strings. There will be a lot of string duplications in the events (especially IDs). 
   ~~This library looks promising: https://github.com/philpearl/intern~~
   EDIT: we need our own interning library, we cannot let the intern cache grow indefinitely. Luckily, it's not difficult.
   
   We can do this in https://issues.apache.org/jira/browse/YUNIKORN-1713.



##########
pkg/events/event_ringbuffer.go:
##########
@@ -0,0 +1,175 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"time"
+
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const latestUnset int64 = -1 << 63
+
+var now = time.Now
+
+// EventRingBuffer A specialized circular buffer to store event objects.
+//
+// Unlike to regular circular buffers, new entries can be added if the buffer is full. In this case,
+// the oldest entry is overwritten. This is not a classic enqueue operation, so it's named differently.
+//
+// Retrieving the records can be achieved with GetLatestEntriesCount and GetLatestEntries. Since these do not
+// remove the elements, they are not regular dequeue operations either.
+//
+// Entries have a maximum lifespan defined in nanoseconds. Cleanup of expired objects occurs when a call to
+// RemoveExpiredEntries is made.
+//
+// The buffer does not use locking, proper synchronization is delegated to the client.
+type EventRingBuffer struct {
+	events        []*si.EventRecord

Review Comment:
   IMO we need to switch to `[]si.EventRecord` if we want to store a lot of elements. We want to avoid GC interactions with pointers, so using a value type here is very much reasonable.
   
   In fact, I'm considering a special, internal type here instead of `EventRecord` with interned strings. There will be a lot of string duplications in the events (especially IDs). 
   ~~This library looks promising: https://github.com/philpearl/intern~~
   EDIT: we need our own interning code, we cannot let the intern cache grow indefinitely. Luckily, it's not difficult.
   
   We can do this in https://issues.apache.org/jira/browse/YUNIKORN-1713.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1407391656


##########
pkg/webservice/webservice.go:
##########
@@ -63,12 +64,13 @@ func loggingHandler(inner http.Handler, name string) http.HandlerFunc {
 // TODO we need the port to be configurable
 func (m *WebService) StartWebApp() {
 	router := newRouter()
+	// Important: do not use ReadTimeout, WriteTimeout or IdleTimeout because those can break the event streaming

Review Comment:
   `ReadHeaderTimeout` is fine, it's the rest what can cause issues. Will check out ResponseController, looks like it's relevant to us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409257636


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 100
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {

Review Comment:
   If we decided to this internally, we might as well just run the aggregation on a separate goroutine. 
   
   Wilfred's comments:
   "I think we can simply use the existing events and push the whole aggregation out of YuniKorn. The only thing that seems to be missing is the node instance type. If that is available just listening to the app and node events would allow creating the summary outside of the scheduler.
   [...]
   hat is the best option. A scheduler should schedule. It is not for providing statistics."
   
   To me it sounds like a separate application.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1411763098


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
+	consumer := make(chan *si.EventRecord, defaultChannelBufSize)
+	stream := &EventStream{
+		Events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	history := e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+
+	go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
+		// Store the refs of historical events; it's possible that some events are added to the
+		// ring buffer and also to "local" channel.
+		// It is because we use two separate locks, so event updates are not atomic.
+		// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
+		// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
+		// map, so "local" will also contain the new event.
+		seen := make(map[*si.EventRecord]bool)
+		for _, event := range history {
+			consumer <- event
+			seen[event] = true
+		}
+		for {
+			select {
+			case <-e.stopCh:
+				close(consumer)
+				return
+			case <-stop:
+				close(consumer)
+				return
+			case event := <-local:
+				if seen[event] {
+					continue
+				}
+				// since events are processed in a single goroutine, doubling is no longer
+				// possible at this point
+				seen = make(map[*si.EventRecord]bool)
+				consumer <- event
+			}
+		}
+	}(consumer, local, stop)
+
+	log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
+	return stream
+}
+
+func (e *EventStreaming) createEventStreamInternal(stream *EventStream,

Review Comment:
   Nit
   The function name is 
   createEventStreamInternal 
   
   But we return the history events for this function, maybe we can get history events after we call this method.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [yunikorn-core] pbacsko commented on a diff in pull request #533: [WIP] [YUNIKORN-1709] Store events separately for event streaming

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1181641754


##########
pkg/events/event_ringbuffer.go:
##########
@@ -0,0 +1,175 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"time"
+
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const latestUnset int64 = -1 << 63
+
+var now = time.Now
+
+// EventRingBuffer A specialized circular buffer to store event objects.
+//
+// Unlike to regular circular buffers, new entries can be added if the buffer is full. In this case,
+// the oldest entry is overwritten. This is not a classic enqueue operation, so it's named differently.
+//
+// Retrieving the records can be achieved with GetLatestEntriesCount and GetLatestEntries. Since these do not
+// remove the elements, they are not regular dequeue operations either.
+//
+// Entries have a maximum lifespan defined in nanoseconds. Cleanup of expired objects occurs when a call to
+// RemoveExpiredEntries is made.
+//
+// The buffer does not use locking, proper synchronization is delegated to the client.
+type EventRingBuffer struct {
+	events        []*si.EventRecord

Review Comment:
   IMO we need to switch to `[]si.EventRecord` if we want to store a lot of elements. We want to avoid GC interactions, so using a value type here is very much reasonable.
   
   In fact, I'm considering a special, internal type here instead of `EventRecord` with interned strings. There will be a lot of string duplications in the events (especially IDs). 
   This library looks promising: https://github.com/philpearl/intern
   
   We can do this in https://issues.apache.org/jira/browse/YUNIKORN-1713.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1407388102


##########
pkg/webservice/handlers.go:
##########
@@ -918,3 +918,61 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getStream(w http.ResponseWriter, r *http.Request) {

Review Comment:
   Yes, there is a follow up ticket for that: https://issues.apache.org/jira/browse/YUNIKORN-2147



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1412097703


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {

Review Comment:
   Because as a customer, i want to include all events including all history events, but i am not sure the count to include all history events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409167968


##########
pkg/webservice/webservice.go:
##########
@@ -63,12 +64,13 @@ func loggingHandler(inner http.Handler, name string) http.HandlerFunc {
 // TODO we need the port to be configurable
 func (m *WebService) StartWebApp() {
 	router := newRouter()
+	// Important: do not use ReadTimeout, WriteTimeout or IdleTimeout because those can break the event streaming

Review Comment:
   Write timeout added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "zhuqi-lucas (via GitHub)" <gi...@apache.org>.
zhuqi-lucas commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1411766477


##########
pkg/events/event_streaming.go:
##########
@@ -0,0 +1,180 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+	"sync"
+	"time"
+
+	"go.uber.org/zap"
+
+	"github.com/apache/yunikorn-core/pkg/log"
+	"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+const defaultChannelBufSize = 1000
+
+// EventStreaming implements the event streaming logic.
+// New events are immediately forwarded to all active consumers.
+type EventStreaming struct {
+	buffer       *eventRingBuffer
+	stopCh       chan struct{}
+	eventStreams map[*EventStream]eventConsumerDetails
+	sync.Mutex
+}
+
+type eventConsumerDetails struct {
+	local     chan *si.EventRecord
+	consumer  chan<- *si.EventRecord
+	stopCh    chan struct{}
+	name      string
+	createdAt time.Time
+}
+
+// EventStream handle type returned to the client that wants to capture the stream of events.
+type EventStream struct {
+	Events <-chan *si.EventRecord
+}
+
+// PublishEvent publishes an event to all event stream consumers.
+//
+// The streaming logic uses bridging to ensure proper ordering of existing and new events.
+// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
+//
+// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
+// Such a consumer is removed and the related channels are closed.
+func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
+	e.Lock()
+	defer e.Unlock()
+
+	for consumer, details := range e.eventStreams {
+		if len(details.local) == defaultChannelBufSize {
+			log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
+			e.removeEventStream(consumer)
+			continue
+		}
+
+		details.local <- event
+	}
+}
+
+// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
+// contains a channel that can be used for reading.
+//
+// When a consumer is finished, it must call RemoveEventStream to free up resources.
+//
+// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
+// of maximum historical events from the ring buffer.
+func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
+	consumer := make(chan *si.EventRecord, defaultChannelBufSize)
+	stream := &EventStream{
+		Events: consumer,
+	}
+	local := make(chan *si.EventRecord, defaultChannelBufSize)
+	stop := make(chan struct{})
+	history := e.createEventStreamInternal(stream, local, consumer, stop, name, count)
+
+	go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
+		// Store the refs of historical events; it's possible that some events are added to the
+		// ring buffer and also to "local" channel.
+		// It is because we use two separate locks, so event updates are not atomic.
+		// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
+		// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
+		// map, so "local" will also contain the new event.
+		seen := make(map[*si.EventRecord]bool)
+		for _, event := range history {
+			consumer <- event
+			seen[event] = true
+		}
+		for {
+			select {
+			case <-e.stopCh:
+				close(consumer)
+				return
+			case <-stop:
+				close(consumer)
+				return
+			case event := <-local:
+				if seen[event] {
+					continue
+				}
+				// since events are processed in a single goroutine, doubling is no longer
+				// possible at this point
+				seen = make(map[*si.EventRecord]bool)
+				consumer <- event
+			}
+		}
+	}(consumer, local, stop)
+
+	log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
+	return stream
+}
+
+func (e *EventStreaming) createEventStreamInternal(stream *EventStream,
+	local chan *si.EventRecord,
+	consumer chan *si.EventRecord,
+	stop chan struct{},
+	name string,
+	count uint64) []*si.EventRecord {
+	// stuff that needs locking
+	e.Lock()
+	defer e.Unlock()
+
+	e.eventStreams[stream] = eventConsumerDetails{
+		local:     local,
+		consumer:  consumer,
+		stopCh:    stop,
+		name:      name,
+		createdAt: time.Now(),
+	}
+
+	return e.buffer.GetRecentEvents(count)
+}
+
+// RemoveEventStream stops the streaming for a given consumer. Must be called to avoid resource leaks.
+func (e *EventStreaming) RemoveEventStream(consumer *EventStream) {
+	e.Lock()
+	defer e.Unlock()
+
+	e.removeEventStream(consumer)
+}
+
+func (e *EventStreaming) removeEventStream(consumer *EventStream) {
+	if details, ok := e.eventStreams[consumer]; ok {
+		log.Log(log.Events).Info("Removing event stream consumer", zap.String("name", details.name),

Review Comment:
   Nit 
   Is it possible, we can logging the remove reason, such as:
   
   "Connection closed for event stream client"
   "Cannot set write deadline"
   "Marshalling error"
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on code in PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#discussion_r1409166784


##########
pkg/webservice/handlers.go:
##########
@@ -918,3 +918,61 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
 		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
 	}
 }
+
+func getStream(w http.ResponseWriter, r *http.Request) {
+	writeHeaders(w)
+	eventSystem := events.GetEventSystem()
+	if !eventSystem.IsEventTrackingEnabled() {
+		buildJSONErrorResponse(w, "Event tracking is disabled", http.StatusInternalServerError)
+		return
+	}
+
+	f, ok := w.(http.Flusher)
+	if !ok {
+		buildJSONErrorResponse(w, "Writer does not implement http.Flusher", http.StatusInternalServerError)
+		return
+	}
+
+	var count uint64
+	if countStr := r.URL.Query().Get("count"); countStr != "" {
+		var err error
+		count, err = strconv.ParseUint(countStr, 10, 64)
+		if err != nil {
+			buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+	}
+
+	enc := json.NewEncoder(w)
+	stream := eventSystem.CreateEventStream(r.Host, count)
+
+	// Reading events in an infinite loop until either the client disconnects or Yunikorn closes the channel.
+	// This results in a persistent HTTP connection where the message body is never closed.
+	// We don't use timeouts and since HTTP 1.1 clients are expected to handle persistent connections by default.

Review Comment:
   Change added. New code uses ResponseController.setWriteDeadline() before each send.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [YUNIKORN-1709] Add event streaming logic [yunikorn-core]

Posted by "pbacsko (via GitHub)" <gi...@apache.org>.
pbacsko commented on PR #533:
URL: https://github.com/apache/yunikorn-core/pull/533#issuecomment-1823261729

   Note: any timeout for the HTTP server can break the streaming (WriteTimeout for 1.1, ReadTimeout and IdleTimeout for 2.0). We either start a new server instance for the streaming or explicitly comment (or even test)  that timeouts must not be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@yunikorn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org