You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:20:11 UTC

[buildstream] 19/19: Add workarounds for queue querying in main process

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit dc863015320f1ff9b749a678a2a1ce4b83c006e0
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jul 16 16:48:38 2019 +0100

    Add workarounds for queue querying in main process
---
 src/buildstream/_frontend/app.py        |  3 ++-
 src/buildstream/_scheduler/scheduler.py | 29 ++++++++++++++++++-----------
 src/buildstream/_stream.py              | 13 ++++++-------
 3 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 90070af..7fd10b4 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -570,8 +570,9 @@ class App():
         if not self.stream.terminated:
             if element_job:
                 # look-up queue
+                # Issue with pickling a queue object, so for now only pass action names
                 for q in self.stream.queues:
-                    if q.action_name == action_name:
+                    if q == action_name:
                         queue = q
                 assert queue, "Job action {} does not have a corresponding queue".format(action_name)
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6649865..44ebeef 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
 import signal
 import datetime
 from contextlib import contextmanager
+import queue
 
 # Local imports
 from .resources import Resources, ResourceType
@@ -55,6 +56,7 @@ class NotificationType(enum.Enum):
     EXCEPTION = "exception"
     TASK_ERROR = "task_error"
     SCHED_TERMINATE = "sched_terminate"
+    QUEUES = "queues"
 
 
 class Notification:
@@ -70,7 +72,8 @@ class Notification:
                  element=None,
                  exception=None,
                  domain=None,
-                 reason=None):
+                 reason=None,
+                 queues=None):
 
         self.notification_type = notification_type
         self.full_name = full_name
@@ -82,6 +85,7 @@ class Notification:
         self.exception = exception
         self.domain = domain
         self.reason = reason
+        self.queues = queues
 
 
 # Scheduler()
@@ -175,6 +179,14 @@ class Scheduler():
         # Hold on to the queues to process
         self.queues = queues
 
+        # Report to the main process which queues are in session,
+        # for now a list of action_names as pickling queues is
+        # causing errors. Will need actual queue object or bidirectional
+        # notification queue for error handling later.
+        queue_list = [q.action_name for q in self.queues]
+        notifcation = Notification(NotificationType.QUEUES, queues=queue_list)
+        self._notify(notifcation)
+
         # Ensure that we have a fresh new event loop, in case we want
         # to run another test in this thread.
         self.loop = asyncio.new_event_loop()
@@ -294,20 +306,17 @@ class Scheduler():
     #    queue (Queue): The Queue holding a complete job
     #    job (Job): The completed Job
     #    status (JobStatus): The status of the completed job
-    #    process_jobs (bool): If the scheduler should also process the
-    #                         job, else just generate the notification
     #
-    def job_completed(self, job, status, process_jobs=True):
+    def job_completed(self, job, status):
 
-        if process_jobs:
-            # Remove from the active jobs list
-            self._active_jobs.remove(job)
+        self._active_jobs.remove(job)
 
+        element = None
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure messages
             # and send the Element() instance if interactive failure handling. Note
             # this may change if the frontend is run in a separate process for pickling
-            element = job._element if (job.element_job and self._interactive_failure) else None
+            element = job._element if (job.element_job and self._interactive_failure) else element
 
         notification = Notification(NotificationType.JOB_COMPLETE,
                                     full_name=job.name,
@@ -317,9 +326,7 @@ class Scheduler():
                                     element=element)
         self._notify(notification)
 
-        if process_jobs:
-            # Now check for more jobs
-            self._sched()
+        self._sched()
 
     # check_cache_size():
     #
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4de975e..e0f0842 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,7 +38,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
-    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
+    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus, Notification
 from ._pipeline import Pipeline, PipelineSelection
 from ._profile import Topics, PROFILER
 from ._state import State
@@ -114,7 +114,6 @@ class Stream():
         try:
             func(*args, **kwargs)
         except Exception as e:
-            from ._scheduler.scheduler import Notification, NotificationType
             queue.put(Notification(NotificationType.EXCEPTION, exception=e))
 
     def run_in_subprocess(self, func, *args, **kwargs):
@@ -367,6 +366,7 @@ class Stream():
         if track_elements:
             self._enqueue_plan(track_elements, queue=track_queue)
         self._enqueue_plan(elements)
+
         self._run()
 
     # fetch()
@@ -1646,15 +1646,14 @@ class Stream():
         elif notification.notification_type == NotificationType.JOB_COMPLETE:
             self._state.remove_task(notification.job_action, notification.full_name)
             if notification.job_status == JobStatus.FAIL:
-                if notification.failed_element:
-                    unique_id = notification.full_name
-                else:
-                    unique_id = None
-                self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+                self._state.fail_task(notification.job_action, notification.full_name,
+                                      notification.failed_element, notification.element)
         elif notification.notification_type == NotificationType.EXCEPTION:
             raise notification.exception
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(notification.domain, notification.reason)
+        elif notification.notification_type == NotificationType.QUEUES:
+            self.queues = notification.queues
         else:
             raise StreamError("Unreccognised notification type recieved")