You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:45:04 UTC
[buildstream] 06/16: Add len of session/total elements members to
Stream
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 8c3f90583aba9cb0442b0025bdc67745f15d170a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 14:51:53 2019 +0100
Add len of session/total elements members to Stream
---
src/buildstream/_frontend/status.py | 4 ++--
src/buildstream/_frontend/widget.py | 4 ++--
src/buildstream/_scheduler/scheduler.py | 5 ++++-
src/buildstream/_stream.py | 19 +++++++++++++++----
4 files changed, 23 insertions(+), 9 deletions(-)
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index 8da7df0..c648e93 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -373,8 +373,8 @@ class _StatusHeader():
#
# ========= 00:00:00 project-name (143/387) =========
#
- session = str(len(self._stream.session_elements))
- total = str(len(self._stream.total_elements))
+ session = self._stream.len_session_elements
+ total = self._stream.len_total_elements
size = 0
text = ''
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 181ee7d..d40b87c 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -565,8 +565,8 @@ class LogLine(Widget):
text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
values = OrderedDict()
- values['Total'] = self.content_profile.fmt(str(len(stream.total_elements)))
- values['Session'] = self.content_profile.fmt(str(len(stream.session_elements)))
+ values['Total'] = self.content_profile.fmt(stream.len_total_elements)
+ values['Session'] = self.content_profile.fmt(stream.len_session_elements)
processed_maxlen = 1
skipped_maxlen = 1
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6677c76..978ee58 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -68,6 +68,7 @@ class NotificationType(FastEnum):
EXCEPTION = "exception"
START = "start"
TASK_GROUPS = "task_groups"
+ ELEMENT_TOTALS = "element_totals"
# Notification()
@@ -91,7 +92,8 @@ class Notification():
message=None,
task_error=None,
exception=None,
- task_groups=None):
+ task_groups=None,
+ element_totals=None):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
@@ -102,6 +104,7 @@ class Notification():
self.task_error = task_error # Tuple of domain & reason
self.exception = exception
self.task_groups = task_groups
+ self.element_totals = element_totals
# Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index b28e40f..a2b9b8f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -72,6 +72,8 @@ class Stream():
self.session_elements = [] # List of elements being processed this session
self.total_elements = [] # Total list of elements based on targets
self.queues = [] # Queue objects
+ self.len_session_elements = None
+ self.len_total_elements = None
#
# Private members
@@ -82,7 +84,6 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
- #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
self._subprocess = None
self._starttime = session_start # Synchronised with Scheduler's relative start time
@@ -127,13 +128,13 @@ class Stream():
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
-
+
self._notify_front = mp.Queue()
self._notify_back = mp.Queue()
# Tell the scheduler to not use the notifier callback
self._scheduler._notify_front = self._notify_front
self._scheduler._notify_back = self._notify_back
-
+
args = list(args)
args.insert(0, self._notify_front)
args.insert(0, func)
@@ -1434,6 +1435,14 @@ class Stream():
else:
self._session_start_callback()
+ # Also send through the session & total elements list lengths for status rendering
+ element_totals = str(len(self.session_elements)), str(len(self.total_elements))
+ if self._notify_front:
+ self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS,
+ element_totals=element_totals))
+ else:
+ self.len_session_elements, self.len_total_elements = element_totals
+
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
if status == SchedStatus.ERROR:
@@ -1758,6 +1767,8 @@ class Stream():
raise SubprocessException(**notification.exception)
elif notification.notification_type == NotificationType.START:
self._session_start_callback()
+ elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
+ self.len_session_elements, self.len_total_elements = notification.element_totals
else:
raise StreamError("Unrecognised notification type received")
@@ -1779,7 +1790,7 @@ class Stream():
except queue.Empty:
notification = None
break
-
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and