You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:20:11 UTC
[buildstream] 19/19: Add workarounds for queue querying in main
process
This is an automated email from the ASF dual-hosted git repository.
tvb pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit dc863015320f1ff9b749a678a2a1ce4b83c006e0
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jul 16 16:48:38 2019 +0100
Add workarounds for queue querying in main process
---
src/buildstream/_frontend/app.py | 3 ++-
src/buildstream/_scheduler/scheduler.py | 29 ++++++++++++++++++-----------
src/buildstream/_stream.py | 13 ++++++-------
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 90070af..7fd10b4 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -570,8 +570,9 @@ class App():
if not self.stream.terminated:
if element_job:
# look-up queue
+ # Issue with pickling a queue object, so for now only pass action names
for q in self.stream.queues:
- if q.action_name == action_name:
+ if q == action_name:
queue = q
assert queue, "Job action {} does not have a corresponding queue".format(action_name)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6649865..44ebeef 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
+import queue
# Local imports
from .resources import Resources, ResourceType
@@ -55,6 +56,7 @@ class NotificationType(enum.Enum):
EXCEPTION = "exception"
TASK_ERROR = "task_error"
SCHED_TERMINATE = "sched_terminate"
+ QUEUES = "queues"
class Notification:
@@ -70,7 +72,8 @@ class Notification:
element=None,
exception=None,
domain=None,
- reason=None):
+ reason=None,
+ queues=None):
self.notification_type = notification_type
self.full_name = full_name
@@ -82,6 +85,7 @@ class Notification:
self.exception = exception
self.domain = domain
self.reason = reason
+ self.queues = queues
# Scheduler()
@@ -175,6 +179,14 @@ class Scheduler():
# Hold on to the queues to process
self.queues = queues
+ # Report to the main process which queues are in session,
+ # for now a list of action_names as pickling queues is
+ # causing errors. Will need actual queue object or bidirectional
+ # notification queue for error handling later.
+ queue_list = [q.action_name for q in self.queues]
+ notifcation = Notification(NotificationType.QUEUES, queues=queue_list)
+ self._notify(notifcation)
+
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
self.loop = asyncio.new_event_loop()
@@ -294,20 +306,17 @@ class Scheduler():
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# status (JobStatus): The status of the completed job
- # process_jobs (bool): If the scheduler should also process the
- # job, else just generate the notification
#
- def job_completed(self, job, status, process_jobs=True):
+ def job_completed(self, job, status):
- if process_jobs:
- # Remove from the active jobs list
- self._active_jobs.remove(job)
+ self._active_jobs.remove(job)
+ element = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance if interactive failure handling. Note
# this may change if the frontend is run in a separate process for pickling
- element = job._element if (job.element_job and self._interactive_failure) else None
+ element = job._element if (job.element_job and self._interactive_failure) else element
notification = Notification(NotificationType.JOB_COMPLETE,
full_name=job.name,
@@ -317,9 +326,7 @@ class Scheduler():
element=element)
self._notify(notification)
- if process_jobs:
- # Now check for more jobs
- self._sched()
+ self._sched()
# check_cache_size():
#
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4de975e..e0f0842 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,7 +38,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus, Notification
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -114,7 +114,6 @@ class Stream():
try:
func(*args, **kwargs)
except Exception as e:
- from ._scheduler.scheduler import Notification, NotificationType
queue.put(Notification(NotificationType.EXCEPTION, exception=e))
def run_in_subprocess(self, func, *args, **kwargs):
@@ -367,6 +366,7 @@ class Stream():
if track_elements:
self._enqueue_plan(track_elements, queue=track_queue)
self._enqueue_plan(elements)
+
self._run()
# fetch()
@@ -1646,15 +1646,14 @@ class Stream():
elif notification.notification_type == NotificationType.JOB_COMPLETE:
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
- if notification.failed_element:
- unique_id = notification.full_name
- else:
- unique_id = None
- self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ self._state.fail_task(notification.job_action, notification.full_name,
+ notification.failed_element, notification.element)
elif notification.notification_type == NotificationType.EXCEPTION:
raise notification.exception
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(notification.domain, notification.reason)
+ elif notification.notification_type == NotificationType.QUEUES:
+ self.queues = notification.queues
else:
raise StreamError("Unreccognised notification type recieved")