You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:45:04 UTC

[buildstream] 06/16: Add len of session/total elements members to Stream

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8c3f90583aba9cb0442b0025bdc67745f15d170a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 14:51:53 2019 +0100

    Add len of session/total elements members to Stream
---
 src/buildstream/_frontend/status.py     |  4 ++--
 src/buildstream/_frontend/widget.py     |  4 ++--
 src/buildstream/_scheduler/scheduler.py |  5 ++++-
 src/buildstream/_stream.py              | 19 +++++++++++++++----
 4 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index 8da7df0..c648e93 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -373,8 +373,8 @@ class _StatusHeader():
         #
         #  ========= 00:00:00 project-name (143/387) =========
         #
-        session = str(len(self._stream.session_elements))
-        total = str(len(self._stream.total_elements))
+        session = self._stream.len_session_elements
+        total = self._stream.len_total_elements
 
         size = 0
         text = ''
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 181ee7d..d40b87c 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -565,8 +565,8 @@ class LogLine(Widget):
         text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
         values = OrderedDict()
 
-        values['Total'] = self.content_profile.fmt(str(len(stream.total_elements)))
-        values['Session'] = self.content_profile.fmt(str(len(stream.session_elements)))
+        values['Total'] = self.content_profile.fmt(stream.len_total_elements)
+        values['Session'] = self.content_profile.fmt(stream.len_session_elements)
 
         processed_maxlen = 1
         skipped_maxlen = 1
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6677c76..978ee58 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -68,6 +68,7 @@ class NotificationType(FastEnum):
     EXCEPTION = "exception"
     START = "start"
     TASK_GROUPS = "task_groups"
+    ELEMENT_TOTALS = "element_totals"
 
 
 # Notification()
@@ -91,7 +92,8 @@ class Notification():
                  message=None,
                  task_error=None,
                  exception=None,
-                 task_groups=None):
+                 task_groups=None,
+                 element_totals=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -102,6 +104,7 @@ class Notification():
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
         self.task_groups = task_groups
+        self.element_totals = element_totals
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index b28e40f..a2b9b8f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -72,6 +72,8 @@ class Stream():
         self.session_elements = []   # List of elements being processed this session
         self.total_elements = []     # Total list of elements based on targets
         self.queues = []             # Queue objects
+        self.len_session_elements = None
+        self.len_total_elements = None
 
         #
         # Private members
@@ -82,7 +84,6 @@ class Stream():
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
-        #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
         self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's relative start time
 
@@ -127,13 +128,13 @@ class Stream():
 
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
-        
+
         self._notify_front = mp.Queue()
         self._notify_back = mp.Queue()
         # Tell the scheduler to not use the notifier callback
         self._scheduler._notify_front = self._notify_front
         self._scheduler._notify_back = self._notify_back
-        
+
         args = list(args)
         args.insert(0, self._notify_front)
         args.insert(0, func)
@@ -1434,6 +1435,14 @@ class Stream():
             else:
                 self._session_start_callback()
 
+        # Also send through the session & total elements list lengths for status rendering
+        element_totals = str(len(self.session_elements)), str(len(self.total_elements))
+        if self._notify_front:
+            self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS,
+                                                element_totals=element_totals))
+        else:
+            self.len_session_elements, self.len_total_elements = element_totals
+
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
         if status == SchedStatus.ERROR:
@@ -1758,6 +1767,8 @@ class Stream():
             raise SubprocessException(**notification.exception)
         elif notification.notification_type == NotificationType.START:
             self._session_start_callback()
+        elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
+            self.len_session_elements, self.len_total_elements = notification.element_totals
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1779,7 +1790,7 @@ class Stream():
             except queue.Empty:
                 notification = None
                 break
-    
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and