You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:30:54 UTC
[buildstream] branch phil/ui-split-refactor created (now dc86301)
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a change to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git.
at dc86301 Add workarounds for queue querying in main process
This branch includes the following new commits:
new 456da57 element.py: Remove redundant second call to _get_cache_key()
new c5db3d8 plugin.py: cache full_name member in __init__
new df095d5 _message.py: Use element_name & element_key instead of unique_id
new b9bcb00 _message.py: Use element_name & element_key instead of unique_id
new b7b445c WIP: Refactor scheduler-frontend communication
new af619ea Add workaround for buildqueue job_complete direct callback
new ed0eabe Refer to stream-scheduler communication as notifications
new e39c940 Send scheduler notifications over a multiprocessing queue
new e0561a7 Psuedocode for subprocess context manager
new 48847cd stream.py: Minimal implementation of event loop function
new d12e62d WIP: Add a way to run stream methods in a subprocess
new be6e68f Change subprocess.run to start
new 8b32793 Move subprocess machinery into a method
new f39f4ef fixup! Move subprocess machinery into a method
new d851dec fixup! fixup! Move subprocess machinery into a method
new 9b491a9 fixup! fixup! fixup! Move subprocess machinery into a method
new c2dafd2 Some tweaks
new 07bb725 Support exception handling from the subprocess
new dc86301 Add workarounds for queue querying in main process
The 19 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[buildstream] 03/19: _message.py: Use element_name & element_key
instead of unique_id
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit df095d5342adb9eccb8d904bb228666a947bb04a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Jul 31 16:31:47 2019 +0100
_message.py: Use element_name & element_key instead of unique_id
Adding the element full name and display key into all element related
messages removes the need to look up the plugintable via a plugin
unique_id just to retrieve the same values for logging and widget
frontend display. Relying on plugintable state is also incompatible
if the frontend will be running in a different process, as it will
exist in multiple states. An Element instance is passed exclusively
if handling interactive failures.
The element full name is now displayed instead of the unique_id,
such as in the debugging widget. It is also displayed in place of
'name' (i.e including any junction prepend) to be more informative.
---
src/buildstream/_basecache.py | 2 +-
src/buildstream/_cas/cascache.py | 9 +--
src/buildstream/_frontend/app.py | 28 +++----
src/buildstream/_frontend/widget.py | 40 +++++-----
src/buildstream/_loader/loader.py | 2 +-
src/buildstream/_message.py | 9 ++-
src/buildstream/_messenger.py | 39 +++++-----
src/buildstream/_pipeline.py | 2 +-
src/buildstream/_project.py | 3 +-
src/buildstream/_scheduler/jobs/elementjob.py | 7 +-
src/buildstream/_scheduler/jobs/job.py | 101 ++++++++++++++------------
src/buildstream/_scheduler/queues/queue.py | 2 +-
src/buildstream/_scheduler/scheduler.py | 16 ++--
src/buildstream/_state.py | 10 +--
src/buildstream/_stream.py | 9 ++-
src/buildstream/plugin.py | 13 +++-
src/buildstream/sandbox/_sandboxremote.py | 2 +-
src/buildstream/sandbox/sandbox.py | 23 +++---
src/buildstream/source.py | 7 +-
tests/frontend/logging.py | 2 +-
20 files changed, 171 insertions(+), 155 deletions(-)
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index 52b777f..0ae64ad 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -244,7 +244,7 @@ class BaseCache():
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
self.context.messenger.message(
- Message(None, message_type, message, **args))
+ Message(message_type, message, **args))
# _set_remotes():
#
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index dbdfa41..9bb354a 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -1248,7 +1248,6 @@ class CASQuota:
available = utils._pretty_size(available_space)
self._message(Message(
- None,
MessageType.WARN,
"Your system does not have enough available " +
"space to support the cache quota specified.",
@@ -1294,7 +1293,7 @@ class CASQuota:
# Start off with an announcement with as much info as possible
volume_size, volume_avail = self._get_cache_volume_size()
self._message(Message(
- None, MessageType.STATUS, "Starting cache cleanup",
+ MessageType.STATUS, "Starting cache cleanup",
detail=("Elements required by the current build plan:\n" + "{}\n" +
"User specified quota: {} ({})\n" +
"Cache usage: {}\n" +
@@ -1310,7 +1309,7 @@ class CASQuota:
# Do a real computation of the cache size once, just in case
self.compute_cache_size()
usage = CASCacheUsage(self)
- self._message(Message(None, MessageType.STATUS,
+ self._message(Message(MessageType.STATUS,
"Cache usage recomputed: {}".format(usage)))
# Collect digests and their remove method
@@ -1330,7 +1329,7 @@ class CASQuota:
space_saved += size
self._message(Message(
- None, MessageType.STATUS,
+ MessageType.STATUS,
"Freed {: <7} {}".format(
utils._pretty_size(size, dec_places=2),
ref)))
@@ -1373,7 +1372,7 @@ class CASQuota:
# Informational message about the side effects of the cleanup
self._message(Message(
- None, MessageType.INFO, "Cleanup completed",
+ MessageType.INFO, "Cleanup completed",
detail=("Removed {} refs and saving {} disk space.\n" +
"Cache usage is now: {}")
.format(removed_ref_count,
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index a5588e6..e1b1d8d 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -31,7 +31,6 @@ from .. import Scope
# Import various buildstream internals
from .._context import Context
-from ..plugin import Plugin
from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
@@ -212,7 +211,8 @@ class App():
self.stream = Stream(self.context, self._session_start,
session_start_callback=self.session_start_cb,
interrupt_callback=self._interrupt_handler,
- ticker_callback=self._tick)
+ ticker_callback=self._tick,
+ interactive_failure=self._interactive_failures)
self._state = self.stream.get_state()
@@ -474,7 +474,7 @@ class App():
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
self.context.messenger.message(
- Message(None, message_type, message, **args))
+ Message(message_type, message, **args))
# Exception handler
#
@@ -559,25 +559,24 @@ class App():
# Args:
# action_name (str): The name of the action being performed,
# same as the task group, if it exists
- # full_name (str): The name of this specific task, e.g. the element name
- # unique_id (int): If an element job failed, the unique ID of the element.
+ # full_name (str): The name of this specific task, e.g. the element full name
+ # element_job (bool): If an element job failed
+ # element (Element): If an element job failed and interactive failure
+ # handling, the Element instance
#
- def _job_failed(self, action_name, full_name, unique_id=None):
+ def _job_failed(self, action_name, full_name, element_job=False, element=None):
# Dont attempt to handle a failure if the user has already opted to
# terminate
if not self.stream.terminated:
- if unique_id:
+ if element_job:
# look-up queue
for q in self.stream.queues:
if q.action_name == action_name:
queue = q
assert queue, "Job action {} does not have a corresponding queue".format(action_name)
- # look-up element
- element = Plugin._lookup(unique_id)
-
# Get the last failure message for additional context
- failure = self._fail_messages.get(element._unique_id)
+ failure = self._fail_messages.get(full_name)
# XXX This is dangerous, sometimes we get the job completed *before*
# the failure message reaches us ??
@@ -585,11 +584,12 @@ class App():
self._status.clear()
click.echo("\n\n\nBUG: Message handling out of sync, " +
"unable to retrieve failure message for element {}\n\n\n\n\n"
- .format(element), err=True)
+ .format(full_name), err=True)
else:
self._handle_failure(element, queue, failure)
else:
+ # Not an element_job, we don't handle the failure
click.echo("\nTerminating all jobs\n", err=True)
self.stream.terminate()
@@ -739,8 +739,8 @@ class App():
return
# Hold on to the failure messages
- if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.unique_id is not None:
- self._fail_messages[message.unique_id] = message
+ if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.element_name is not None:
+ self._fail_messages[message.element_name] = message
# Send to frontend if appropriate
if is_silenced and (message.message_type not in unconditional_messages):
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index fbde249..31f69a5 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -27,11 +27,10 @@ from ruamel import yaml
import click
from .profile import Profile
-from .. import Element, Consistency, Scope
+from .. import Consistency, Scope
from .. import __version__ as bst_version
from .._exceptions import ImplError
from .._message import MessageType
-from ..plugin import Plugin
# These messages are printed a bit differently
@@ -110,12 +109,12 @@ class WallclockTime(Widget):
class Debug(Widget):
def render(self, message):
- unique_id = 0 if message.unique_id is None else message.unique_id
+ element_name = "n/a" if message.element_name is None else message.element_name
text = self.format_profile.fmt('pid:')
text += self.content_profile.fmt("{: <5}".format(message.pid))
- text += self.format_profile.fmt(" id:")
- text += self.content_profile.fmt("{:0>3}".format(unique_id))
+ text += self.format_profile.fmt("element name:")
+ text += self.content_profile.fmt("{: <30}".format(element_name))
return text
@@ -181,11 +180,9 @@ class ElementName(Widget):
def render(self, message):
action_name = message.action_name
- element_id = message.task_id or message.unique_id
- if element_id is not None:
- plugin = Plugin._lookup(element_id)
- name = plugin._get_full_name()
- name = '{: <30}'.format(name)
+ element_name = message.element_name
+ if element_name is not None:
+ name = '{: <30}'.format(element_name)
else:
name = 'core activity'
name = '{: <30}'.format(name)
@@ -215,18 +212,16 @@ class CacheKey(Widget):
def render(self, message):
- element_id = message.task_id or message.unique_id
if not self._key_length:
return ""
- if element_id is None:
+ if message.element_name is None:
return ' ' * self._key_length
missing = False
key = ' ' * self._key_length
- plugin = Plugin._lookup(element_id)
- if isinstance(plugin, Element):
- _, key, missing = plugin._get_display_key()
+ if message.element_key:
+ _, key, missing = message.element_key
if message.message_type in ERROR_MESSAGES:
text = self._err_profile.fmt(key)
@@ -557,12 +552,12 @@ class LogLine(Widget):
if self._failure_messages:
values = OrderedDict()
- for element, messages in sorted(self._failure_messages.items(), key=lambda x: x[0].name):
+ for element_name, messages in sorted(self._failure_messages.items()):
for group in self._state.task_groups.values():
# Exclude the failure messages if the job didn't ultimately fail
# (e.g. succeeded on retry)
- if element.name in group.failed_tasks:
- values[element.name] = ''.join(self._render(v) for v in messages)
+ if element_name in group.failed_tasks:
+ values[element_name] = ''.join(self._render(v) for v in messages)
if values:
text += self.content_profile.fmt("Failure Summary\n", bold=True)
@@ -616,10 +611,9 @@ class LogLine(Widget):
def render(self, message):
# Track logfiles for later use
- element_id = message.task_id or message.unique_id
- if message.message_type in ERROR_MESSAGES and element_id is not None:
- plugin = Plugin._lookup(element_id)
- self._failure_messages[plugin].append(message)
+ element_name = message.element_name
+ if message.message_type in ERROR_MESSAGES and element_name is not None:
+ self._failure_messages[element_name].append(message)
return self._render(message)
@@ -666,7 +660,7 @@ class LogLine(Widget):
if message.detail:
# Identify frontend messages, we never abbreviate these
- frontend_message = not (message.task_id or message.unique_id)
+ frontend_message = not message.element_name
# Split and truncate message detail down to message_lines lines
lines = message.detail.splitlines(True)
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 4b66288..061d28b 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -679,7 +679,7 @@ class Loader():
if self.project._warning_is_fatal(warning_token):
raise LoadError(brief, warning_token)
- message = Message(None, MessageType.WARN, brief)
+ message = Message(MessageType.WARN, brief)
self._context.messenger.message(message)
# Print warning messages if any of the specified elements have invalid names.
diff --git a/src/buildstream/_message.py b/src/buildstream/_message.py
index 7f1a939..195eba6 100644
--- a/src/buildstream/_message.py
+++ b/src/buildstream/_message.py
@@ -54,8 +54,9 @@ unconditional_messages = [
#
class Message():
- def __init__(self, unique_id, message_type, message,
- task_id=None,
+ def __init__(self, message_type, message, *,
+ element_name=None,
+ element_key=None,
detail=None,
action_name=None,
elapsed=None,
@@ -64,14 +65,14 @@ class Message():
scheduler=False):
self.message_type = message_type # Message type
self.message = message # The message string
+ self.element_name = element_name # The instance element name of the issuing plugin
+ self.element_key = element_key # The display key of the issuing plugin element
self.detail = detail # An additional detail string
self.action_name = action_name # Name of the task queue (fetch, refresh, build, etc)
self.elapsed = elapsed # The elapsed time, in timed messages
self.logfile = logfile # The log file path where commands took place
self.sandbox = sandbox # The error that caused this message used a sandbox
self.pid = os.getpid() # The process pid
- self.unique_id = unique_id # The plugin object ID issueing the message
- self.task_id = task_id # The plugin object ID of the task
self.scheduler = scheduler # Whether this is a scheduler level message
self.creation_time = datetime.datetime.now()
if message_type in (MessageType.SUCCESS, MessageType.FAIL):
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index d768abf..be5f12c 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,6 @@ from . import _signals
from . import utils
from ._exceptions import BstError
from ._message import Message, MessageType
-from .plugin import Plugin
_RENDER_INTERVAL = datetime.timedelta(seconds=1)
@@ -149,16 +148,16 @@ class Messenger():
#
# Args:
# activity_name (str): The name of the activity
- # unique_id (int): Optionally, the unique id of the plugin related to the message
+ # element_name (str): Optionally, the element full name of the plugin related to the message
# detail (str): An optional detailed message, can be multiline output
# silent_nested (bool): If True, all but _message.unconditional_messages are silenced
#
@contextmanager
- def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
+ def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False):
with self._timed_suspendable() as timedata:
try:
# Push activity depth for status messages
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
+ message = Message(MessageType.START, activity_name, detail=detail, element_name=element_name)
self.message(message)
with self.silence(actually_silence=silent_nested):
yield
@@ -167,12 +166,12 @@ class Messenger():
# Note the failure in status messages and reraise, the scheduler
# expects an error when there is an error.
elapsed = datetime.datetime.now() - timedata.start_time
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
+ message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name)
self.message(message)
raise
elapsed = datetime.datetime.now() - timedata.start_time
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
+ message = Message(MessageType.SUCCESS, activity_name, elapsed=elapsed, element_name=element_name)
self.message(message)
# simple_task()
@@ -181,7 +180,7 @@ class Messenger():
#
# Args:
# activity_name (str): The name of the activity
- # unique_id (int): Optionally, the unique id of the plugin related to the message
+ # element_name (str): Optionally, the element full name of the plugin related to the message
# full_name (str): Optionally, the distinguishing name of the activity, e.g. element name
# silent_nested (bool): If True, all but _message.unconditional_messages are silenced
#
@@ -189,10 +188,10 @@ class Messenger():
# Task: A Task object that represents this activity, principally used to report progress
#
@contextmanager
- def simple_task(self, activity_name, *, unique_id=None, full_name=None, silent_nested=False):
+ def simple_task(self, activity_name, *, element_name=None, full_name=None, silent_nested=False):
# Bypass use of State when none exists (e.g. tests)
if not self._state:
- with self.timed_activity(activity_name, unique_id=unique_id, silent_nested=silent_nested):
+ with self.timed_activity(activity_name, element_name=element_name, silent_nested=silent_nested):
yield
return
@@ -201,7 +200,7 @@ class Messenger():
with self._timed_suspendable() as timedata:
try:
- message = Message(unique_id, MessageType.START, activity_name)
+ message = Message(MessageType.START, activity_name, element_name=element_name)
self.message(message)
task = self._state.add_task(activity_name, full_name)
@@ -215,7 +214,7 @@ class Messenger():
except BstError:
elapsed = datetime.datetime.now() - timedata.start_time
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
+ message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name)
self.message(message)
raise
finally:
@@ -232,7 +231,8 @@ class Messenger():
detail = "{} subtasks processed".format(task.current_progress)
else:
detail = None
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed, detail=detail)
+ message = Message(MessageType.SUCCESS, activity_name, elapsed=elapsed, detail=detail,
+ element_name=element_name)
self.message(message)
# recorded_messages()
@@ -336,13 +336,12 @@ class Messenger():
EMPTYTIME = "--:--:--"
template = "[{timecode: <8}] {type: <7}"
- # If this message is associated with a plugin, print what
- # we know about the plugin.
- plugin_name = ""
- if message.unique_id:
- template += " {plugin}"
- plugin = Plugin._lookup(message.unique_id)
- plugin_name = plugin.name
+ # If this message is associated with an element or source plugin, print the
+ # full element name of the instance.
+ element_name = ""
+ if message.element_name:
+ template += " {element_name}"
+ element_name = message.element_name
template += ": {message}"
@@ -359,7 +358,7 @@ class Messenger():
timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
text = template.format(timecode=timecode,
- plugin=plugin_name,
+ element_name=element_name,
type=message.message_type.upper(),
message=message.message,
detail=detail)
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index af60ffd..b6896b3 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -485,7 +485,7 @@ class Pipeline():
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
self._context.messenger.message(
- Message(None, message_type, message, **args))
+ Message(message_type, message, **args))
# _Planner()
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 9428ab4..96635f3 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -459,7 +459,7 @@ class Project():
]
detail += "\n".join(lines)
self._context.messenger.message(
- Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
+ Message(MessageType.WARN, "Ignoring redundant source references", detail=detail))
return elements
@@ -685,7 +685,6 @@ class Project():
if not fail_on_overlap.is_none():
self._context.messenger.message(
Message(
- None,
MessageType.WARN,
"Use of fail-on-overlap within project.conf " +
"is deprecated. Consider using fatal-warnings instead."
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 1384486..6bcb9de 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -68,14 +68,15 @@ class ElementJob(Job):
def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
self.set_name(element._get_full_name())
+ self.element_job = True
self.queue = queue
self._element = element
self._action_cb = action_cb # The action callable function
self._complete_cb = complete_cb # The complete callable function
- # Set the ID for logging purposes
- self.set_message_unique_id(element._unique_id)
- self.set_task_id(element._unique_id)
+ # Set the plugin element name & key for logging purposes
+ self.set_message_element_name(self.name)
+ self.set_message_element_key(self._element._get_display_key())
@property
def element(self):
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 2e6c7bb..d80d1a9 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -131,6 +131,7 @@ class Job():
self.name = None # The name of the job, set by the job's subclass
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
+ self.element_job = False # If the job is an ElementJob
#
# Private members
@@ -147,8 +148,8 @@ class Job():
self._terminated = False # Whether this job has been explicitly terminated
self._logfile = logfile
- self._message_unique_id = None
- self._task_id = None
+ self._message_element_name = None # The plugin instance element name for messaging
+ self._message_element_key = None # The element key for messaging
# set_name()
#
@@ -174,8 +175,8 @@ class Job():
self._logfile,
self._max_retries,
self._tries,
- self._message_unique_id,
- self._task_id,
+ self._message_element_name,
+ self._message_element_key
)
# Make sure that picklability doesn't break, by exercising it during
@@ -311,36 +312,27 @@ class Job():
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
- # set_message_unique_id()
+ # set_message_element_name()
#
- # This is called by Job subclasses to set the plugin ID
- # issuing the message (if an element is related to the Job).
+ # This is called by Job subclasses to set the plugin instance element
+ # name issuing the message (if an element is related to the Job).
#
# Args:
- # unique_id (int): The id to be supplied to the Message() constructor
+ # element_name (int): The element_name to be supplied to the Message() constructor
#
- def set_message_unique_id(self, unique_id):
- self._message_unique_id = unique_id
+ def set_message_element_name(self, element_name):
+ self._message_element_name = element_name
- # set_task_id()
+ # set_message_element_key()
#
- # This is called by Job subclasses to set a plugin ID
- # associated with the task at large (if any element is related
- # to the task).
- #
- # This will only be used in the child process running the task.
- #
- # The task ID helps keep messages in the frontend coherent
- # in the case that multiple plugins log in the context of
- # a single task (e.g. running integration commands should appear
- # in the frontend for the element being built, not the element
- # running the integration commands).
+ # This is called by Job subclasses to set the element
+ # key for for the issuing message (if an element is related to the Job).
#
# Args:
- # task_id (int): The plugin identifier for this task
+ # element_key (tuple): The element_key tuple to be supplied to the Message() constructor
#
- def set_task_id(self, task_id):
- self._task_id = task_id
+ def set_message_element_key(self, element_key):
+ self._message_element_key = element_key
# message():
#
@@ -351,16 +343,18 @@ class Job():
# message_type (MessageType): The type of message to send
# message (str): The message
# kwargs: Remaining Message() constructor arguments, note that you can
- # override 'unique_id' this way.
+ # override 'element_name' and 'element_key' this way.
#
- def message(self, message_type, message, **kwargs):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
+ kwargs['scheduler'] = True
kwargs['scheduler'] = True
- unique_id = self._message_unique_id
- if "unique_id" in kwargs:
- unique_id = kwargs["unique_id"]
- del kwargs["unique_id"]
+ # If default name & key values not provided, set as given job attributes
+ if element_name is None:
+ element_name = self._message_element_name
+ if element_key is None:
+ element_key = self._message_element_key
self._scheduler.context.messenger.message(
- Message(unique_id, message_type, message, **kwargs))
+ Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs))
#######################################################
# Abstract Methods #
@@ -573,13 +567,16 @@ class Job():
# that should be used - should contain {pid}.
# max_retries (int): The maximum number of retries.
# tries (int): The number of retries so far.
-# message_unique_id (int): None, or the id to be supplied to the Message() constructor.
-# task_id (int): None, or the plugin identifier for this job.
+# message_element_name (str): None, or the plugin instance element name
+# to be supplied to the Message() constructor.
+# message_element_key (tuple): None, or the element display key tuple
+# to be supplied to the Message() constructor.
#
class ChildJob():
def __init__(
- self, action_name, messenger, logdir, logfile, max_retries, tries, message_unique_id, task_id):
+ self, action_name, messenger, logdir, logfile, max_retries, tries,
+ message_element_name, message_element_key):
self.action_name = action_name
@@ -588,8 +585,8 @@ class ChildJob():
self._logfile = logfile
self._max_retries = max_retries
self._tries = tries
- self._message_unique_id = message_unique_id
- self._task_id = task_id
+ self._message_element_name = message_element_name
+ self._message_element_key = message_element_key
self._queue = None
@@ -601,17 +598,20 @@ class ChildJob():
# Args:
# message_type (MessageType): The type of message to send
# message (str): The message
- # kwargs: Remaining Message() constructor arguments, note that you can
- # override 'unique_id' this way.
+ # kwargs: Remaining Message() constructor arguments, note
+ # element_key is set in _child_message_handler
+ # for front end display if not already set or explicitly
+ # overriden here.
#
- def message(self, message_type, message, **kwargs):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
kwargs['scheduler'] = True
- unique_id = self._message_unique_id
- if "unique_id" in kwargs:
- unique_id = kwargs["unique_id"]
- del kwargs["unique_id"]
- self._messenger.message(
- Message(unique_id, message_type, message, **kwargs))
+ # If default name & key values not provided, set as given job attributes
+ if element_name is None:
+ element_name = self._message_element_name
+ if element_key is None:
+ element_key = self._message_element_key
+ self._messenger.message(Message(message_type, message, element_name=element_name,
+ element_key=element_key, **kwargs))
# send_message()
#
@@ -844,6 +844,8 @@ class ChildJob():
# frontend's main message handler in the context of a child task
# and performs local logging to the local log file before sending
# the message back to the parent process for further propagation.
+ # The related element display key is added to the message for
+ # widget rendering if not already set for an element childjob.
#
# Args:
# message (Message): The message to log
@@ -852,7 +854,12 @@ class ChildJob():
def _child_message_handler(self, message, is_silenced):
message.action_name = self.action_name
- message.task_id = self._task_id
+
+ # If no key has been set at this point, and the element job has
+ # a related key, set it. This is needed for messages going
+ # straight to the message handler from the child process.
+ if message.element_key is None and self._message_element_key:
+ message.element_key = self._message_element_key
# Send to frontend if appropriate
if is_silenced and (message.message_type not in unconditional_messages):
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 79f1fa4..8c81ce2 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -341,7 +341,7 @@ class Queue():
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
context = element._get_context()
- message = Message(element._unique_id, message_type, brief, **kwargs)
+ message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
context.messenger.message(message)
def _element_log_path(self, element):
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2dea1d4..4f668c6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob
from .._profile import Topics, PROFILER
@@ -64,13 +64,15 @@ _ACTION_NAME_CACHE_SIZE = 'size'
# state: The state that can be made available to the frontend
# interrupt_callback: A callback to handle ^C
# ticker_callback: A callback call once per second
+# interactive_failure: If the session is set to handle interactive failures
#
class Scheduler():
def __init__(self, context,
start_time, state,
interrupt_callback=None,
- ticker_callback=None):
+ ticker_callback=None,
+ interactive_failure=False):
#
# Public members
@@ -92,6 +94,7 @@ class Scheduler():
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
+ self._interactive_failure = interactive_failure # If the session is set to handle interactive failures
# State of cache management related jobs
self._cache_size_scheduled = False # Whether we have a cache size job scheduled
@@ -253,10 +256,11 @@ class Scheduler():
self._state.remove_task(job.action_name, job.name)
if status == JobStatus.FAIL:
- unique_id = None
- if isinstance(job, ElementJob):
- unique_id = job._element._unique_id
- self._state.fail_task(job.action_name, job.name, unique_id)
+ # If it's an elementjob, we want to compare against the failure messages
+ # and send the Element() instance if interactive failure handling. Note
+ # this may change if the frontend is run in a separate process for pickling
+ element = job._element if (job.element_job and self._interactive_failure) else None
+ self._state.fail_task(job.action_name, job.name, element_job=job.element_job, element=element)
# Now check for more jobs
self._sched()
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 388ed81..2169467 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -208,8 +208,7 @@ class State():
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
- # unique_id (int): (optionally) the element's unique ID, if the failure
- # came from an element
+ # element_job (bool): (optionally) If an element job failed.
#
def register_task_failed_callback(self, callback):
self._task_failed_cbs.append(callback)
@@ -324,11 +323,12 @@ class State():
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
- # unique_id (int): (optionally) the element's unique ID, if the failure came from an element
+ # element_job (bool): (optionally) If an element job failed.
+ # element (Element): (optionally) The element instance if interactive handling
#
- def fail_task(self, action_name, full_name, unique_id=None):
+ def fail_task(self, action_name, full_name, element_job=False, element=None):
for cb in self._task_failed_cbs:
- cb(action_name, full_name, unique_id)
+ cb(action_name, full_name, element_job, element)
# _Task
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index cbd635a..5f12889 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -55,13 +55,15 @@ from . import Scope, Consistency
# session_start_callback (callable): A callback to invoke when the session starts
# interrupt_callback (callable): A callback to invoke when we get interrupted
# ticker_callback (callable): Invoked every second while running the scheduler
+# interactive_failure: If the session is set to handle interactive failures
#
class Stream():
def __init__(self, context, session_start, *,
session_start_callback=None,
interrupt_callback=None,
- ticker_callback=None):
+ ticker_callback=None,
+ interactive_failure=False):
#
# Public members
@@ -85,7 +87,8 @@ class Stream():
self._scheduler = Scheduler(context, session_start, self._state,
interrupt_callback=interrupt_callback,
- ticker_callback=ticker_callback)
+ ticker_callback=ticker_callback,
+ interactive_failure=interactive_failure)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
@@ -1235,7 +1238,7 @@ class Stream():
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
self._context.messenger.message(
- Message(None, message_type, message, **args))
+ Message(message_type, message, **args))
# _add_queue()
#
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 506eba5..35e80dd 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -428,6 +428,9 @@ class Plugin():
Note: Informative messages tell the user something they might want
to know, like if refreshing an element caused it to change.
+ The instance full name of the plugin will be generated with the
+ message, this being the name of the given element, as appose to
+ the class name of the underlying plugin __kind identifier.
"""
self.__message(MessageType.INFO, brief, detail=detail)
@@ -491,7 +494,7 @@ class Plugin():
self.call(... command which takes time ...)
"""
with self.__context.messenger.timed_activity(activity_name,
- unique_id=self._unique_id,
+ element_name=self._get_full_name(),
detail=detail,
silent_nested=silent_nested):
yield
@@ -733,7 +736,7 @@ class Plugin():
return (exit_code, output)
def __message(self, message_type, brief, **kwargs):
- message = Message(self._unique_id, message_type, brief, **kwargs)
+ message = Message(message_type, brief, element_name=self._get_full_name(), **kwargs)
self.__context.messenger.message(message)
def __note_command(self, output, *popenargs, **kwargs):
@@ -761,10 +764,12 @@ class Plugin():
def __set_full_name(self):
project = self.__project
+ # Set the name, depending on element or source plugin type
+ name = self._element_name if self.__type_tag == "source" else self.name # pylint: disable=no-member
if project.junction:
- return '{}:{}'.format(project.junction.name, self.name)
+ return '{}:{}'.format(project.junction.name, name)
else:
- return self.name
+ return name
# A local table for _prefix_warning()
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index c84bfa4..bbd23fc 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -107,7 +107,7 @@ class SandboxRemote(Sandbox):
self.operation_name = None
def info(self, msg):
- self._get_context().messenger.message(Message(None, MessageType.INFO, msg))
+ self._get_context().messenger.message(Message(MessageType.INFO, msg))
@staticmethod
def specs_from_config_node(config_node, basedir=None):
diff --git a/src/buildstream/sandbox/sandbox.py b/src/buildstream/sandbox/sandbox.py
index ece15c9..ac9d672 100644
--- a/src/buildstream/sandbox/sandbox.py
+++ b/src/buildstream/sandbox/sandbox.py
@@ -120,12 +120,12 @@ class Sandbox():
self.__allow_real_directory = kwargs['allow_real_directory']
self.__allow_run = True
- # Plugin ID for logging
+ # Plugin element full name for logging
plugin = kwargs.get('plugin', None)
if plugin:
- self.__plugin_id = plugin._unique_id
+ self.__element_name = plugin._get_full_name()
else:
- self.__plugin_id = None
+ self.__element_name = None
# Configuration from kwargs common to all subclasses
self.__config = kwargs['config']
@@ -563,12 +563,12 @@ class Sandbox():
return False
- # _get_plugin_id()
+ # _get_element_name()
#
- # Get the plugin's unique identifier
+ # Get the plugin's element full name
#
- def _get_plugin_id(self):
- return self.__plugin_id
+ def _get_element_name(self):
+ return self.__element_name
# _callback()
#
@@ -622,8 +622,7 @@ class Sandbox():
# details (str): optional, more detatils
def _issue_warning(self, message, detail=None):
self.__context.messenger.message(
- Message(None,
- MessageType.WARN,
+ Message(MessageType.WARN,
message,
detail=detail
)
@@ -649,7 +648,7 @@ class _SandboxBatch():
def execute_group(self, group):
if group.label:
context = self.sandbox._get_context()
- cm = context.messenger.timed_activity(group.label, unique_id=self.sandbox._get_plugin_id())
+ cm = context.messenger.timed_activity(group.label, element_name=self.sandbox._get_element_name())
else:
cm = contextlib.suppress()
@@ -659,8 +658,8 @@ class _SandboxBatch():
def execute_command(self, command):
if command.label:
context = self.sandbox._get_context()
- message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS,
- 'Running command', detail=command.label)
+ message = Message(MessageType.STATUS, 'Running command', detail=command.label,
+ element_name=self.sandbox._get_element_name())
context.messenger.message(message)
exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env)
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index b513fdb..c5fb614 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -308,10 +308,11 @@ class Source(Plugin):
def __init__(self, context, project, meta, *, alias_override=None, unique_id=None):
provenance = meta.config.get_provenance()
+ # Set element_name member before parent init, as needed for debug messaging
+ self.__element_name = meta.element_name # The name of the element owning this source
super().__init__("{}-{}".format(meta.element_name, meta.element_index),
context, project, provenance, "source", unique_id=unique_id)
- self.__element_name = meta.element_name # The name of the element owning this source
self.__element_index = meta.element_index # The index of the source in the owning element's source list
self.__element_kind = meta.element_kind # The kind of the element owning this source
self.__directory = meta.directory # Staging relative directory
@@ -1076,6 +1077,10 @@ class Source(Plugin):
length = min(len(key), context.log_key_length)
return key[:length]
+ @property
+ def _element_name(self):
+ return self.__element_name
+
# _get_args_for_child_job_pickling(self)
#
# Return data necessary to reconstruct this object in a child job process.
diff --git a/tests/frontend/logging.py b/tests/frontend/logging.py
index 6a17bf7..462af82 100644
--- a/tests/frontend/logging.py
+++ b/tests/frontend/logging.py
@@ -124,7 +124,7 @@ def test_failed_build_listing(cli, datafiles):
assert m.end() in failure_summary_range
assert len(matches) == 3 # each element should be matched once.
- # Note that if we mess up the 'unique_id' of Messages, they won't be printed
+ # Note that if we mess up the 'element_name' of Messages, they won't be printed
# with the name of the relevant element, e.g. 'testfail-1.bst'. Check that
# they have the name as expected.
pattern = r"\[..:..:..\] FAILURE testfail-.\.bst: Staged artifacts do not provide command 'sh'"
[buildstream] 13/19: Move subprocess machinery into a method
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 8b32793fa8ce95273ee034829166e0697363d00e
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 11:26:04 2019 +0100
Move subprocess machinery into a method
This get's round the pickling identity issues
---
src/buildstream/_stream.py | 128 ++++++++++++++++++++++++++++++---------------
1 file changed, 87 insertions(+), 41 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0fe2234..e4f6bc0 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,7 +19,6 @@
# Jürg Billeter <ju...@codethink.co.uk>
# Tristan Maat <tr...@codethink.co.uk>
-import asyncio
import itertools
import functools
import multiprocessing as mp
@@ -48,38 +47,44 @@ from . import utils, _yaml, _site
from . import Scope, Consistency
-# A decorator which runs the decorated method to be run in a subprocess
-def subprocessed(func):
-
- @functools.wraps(func)
- def _subprocessed(self, *args, **kwargs):
- assert self
- print("Args: {}".format([*args]))
- print("Kwargs: {}".format(list(kwargs.items())))
- assert not self._subprocess
-
- # TODO use functools to pass arguments to func to make target for subprocess
-
- # Start subprocessed work
- mp_context = mp.get_context(method='spawn')
- process_name = "stream-{}".format(func.__name__)
- target = functools.partial(func, self, *args, **kwargs)
- print("launching subprocess:", process_name)
- self._subprocess = mp_context.Process(target=target, name=process_name)
- self._subprocess.start()
-
- # TODO connect signal handlers
-
- # Run event loop. This event loop should exit once the
- # subprocessed work has completed
- print("Starting loop...")
- while not self._subprocess.exitcode:
- self._loop()
- print("Stopping loop...")
-
- # Return result of subprocessed function
-
- return _subprocessed
+def _subprocessed(self, *args, **kwargs):
+ assert self
+ print("Args: {}".format([*args]))
+ print("Kwargs: {}".format(list(kwargs.items())))
+ assert not self._subprocess
+
+ global notification_count
+ notification_count = 0
+ # TODO use functools to pass arguments to func to make target for subprocess
+
+ # Start subprocessed work
+ mp_context = mp.get_context(method='spawn')
+ process_name = "stream-{}".format(func.__name__)
+ print("launchinglaunching subprocess:", process_name)
+ print(func.__module__, func.__name__)
+ import buildstream
+ try:
+ assert func is buildstream._stream.Stream.build or func is Stream.build
+ except AssertionError:
+ print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+ self._subprocess.start()
+
+ # TODO connect signal handlers
+
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+ print("Starting loop...")
+ while not self._subprocess.exitcode:
+ self._loop()
+ print("Stopping loop...")
+
+ try:
+ while True:
+ notification = self.notification_queue.get()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ pass
# Stream()
@@ -141,6 +146,46 @@ class Stream():
def init(self):
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
+ print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
+
+
+ def run_in_subprocess(self, func, *args, **kwargs):
+ print("Args: {}".format([*args]))
+ print("Kwargs: {}".format(list(kwargs.items())))
+ assert not self._subprocess
+
+ global notification_count
+ notification_count = 0
+ # TODO use functools to pass arguments to func to make target for subprocess
+
+ # Start subprocessed work
+ mp_context = mp.get_context(method='fork')
+ process_name = "stream-{}".format(func.__name__)
+ print("launchinglaunching subprocess:", process_name)
+ print(func.__module__, func.__name__)
+ import buildstream
+ try:
+ assert func is buildstream._stream.Stream.build or func is Stream.build
+ except AssertionError:
+ print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+ self._subprocess.start()
+
+ # TODO connect signal handlers
+
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+ print("Starting loop...")
+ while not self._subprocess.exitcode:
+ self._loop()
+ print("Stopping loop...")
+
+ try:
+ while True:
+ notification = self.notification_queue.get()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ pass
# cleanup()
#
@@ -265,6 +310,8 @@ class Stream():
return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
usebuildtree=buildtree)
+ def build(self, *args, **kwargs):
+ self.run_in_subprocess(self._build, *args, **kwargs)
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -282,14 +329,13 @@ class Stream():
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
- @subprocessed
- def build(self, targets, *,
- track_targets=None,
- track_except=None,
- track_cross_junctions=False,
- ignore_junction_targets=False,
- build_all=False,
- remote=None):
+ def _build(self, targets, *,
+ track_targets=None,
+ track_except=None,
+ track_cross_junctions=False,
+ ignore_junction_targets=False,
+ build_all=False,
+ remote=None):
if build_all:
selection = PipelineSelection.ALL
[buildstream] 14/19: fixup! Move subprocess machinery into a method
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit f39f4ef0e541f50911fb695e7848a2e6482f2011
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 14:21:30 2019 +0100
fixup! Move subprocess machinery into a method
---
src/buildstream/_stream.py | 94 +++++++++++-----------------------------------
1 file changed, 22 insertions(+), 72 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e4f6bc0..1de8364 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -47,46 +47,6 @@ from . import utils, _yaml, _site
from . import Scope, Consistency
-def _subprocessed(self, *args, **kwargs):
- assert self
- print("Args: {}".format([*args]))
- print("Kwargs: {}".format(list(kwargs.items())))
- assert not self._subprocess
-
- global notification_count
- notification_count = 0
- # TODO use functools to pass arguments to func to make target for subprocess
-
- # Start subprocessed work
- mp_context = mp.get_context(method='spawn')
- process_name = "stream-{}".format(func.__name__)
- print("launchinglaunching subprocess:", process_name)
- print(func.__module__, func.__name__)
- import buildstream
- try:
- assert func is buildstream._stream.Stream.build or func is Stream.build
- except AssertionError:
- print(func, func.__qualname__, func.__name__, func.__module__, id(func))
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
- self._subprocess.start()
-
- # TODO connect signal handlers
-
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
- print("Starting loop...")
- while not self._subprocess.exitcode:
- self._loop()
- print("Stopping loop...")
-
- try:
- while True:
- notification = self.notification_queue.get()
- self._scheduler_notification_handler(notification)
- except queue.Empty:
- pass
-
-
# Stream()
#
# This is the main, toplevel calling interface in BuildStream core.
@@ -146,15 +106,15 @@ class Stream():
def init(self):
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
- print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
-
+ print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__,
+ id(Stream.build))
def run_in_subprocess(self, func, *args, **kwargs):
print("Args: {}".format([*args]))
print("Kwargs: {}".format(list(kwargs.items())))
assert not self._subprocess
- global notification_count
+ global notification_count
notification_count = 0
# TODO use functools to pass arguments to func to make target for subprocess
@@ -162,30 +122,22 @@ class Stream():
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
print("launchinglaunching subprocess:", process_name)
- print(func.__module__, func.__name__)
- import buildstream
- try:
- assert func is buildstream._stream.Stream.build or func is Stream.build
- except AssertionError:
- print(func, func.__qualname__, func.__name__, func.__module__, id(func))
self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
self._subprocess.start()
# TODO connect signal handlers
-
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
- print("Starting loop...")
- while not self._subprocess.exitcode:
+ while self._subprocess.exitcode is not None:
+ self._subprocess.join(0.1)
self._loop()
print("Stopping loop...")
- try:
- while True:
- notification = self.notification_queue.get()
- self._scheduler_notification_handler(notification)
- except queue.Empty:
- pass
+ # try:
+ # while True:
+ # notification = self._notification_queue.get_nowait()
+ # self._scheduler_notification_handler(notification)
+ # except queue.Empty:
+ # print("Finished processing notifications")
+ # pass
# cleanup()
#
@@ -312,6 +264,7 @@ class Stream():
def build(self, *args, **kwargs):
self.run_in_subprocess(self._build, *args, **kwargs)
+
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -1701,15 +1654,12 @@ class Stream():
# work to a subprocess with the @subprocessed decorator
def _loop(self):
assert self._notification_queue
-
- # Check for new messages
- try:
- notification = self._notification_queue.get(block=True, timeout=0.1)
- except queue.Empty:
- notification = None
- print("queue empty, continuing...")
-
- # Process new messages
- if notification:
- print("handling notifications")
- self._scheduler_notification_handler(notification)
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notification_queue.get_nowait()
+ print("handling notifications")
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ notification = None
+ break
[buildstream] 04/19: _message.py: Use element_name & element_key
instead of unique_id
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit b9bcb00b1719396c40b75b98d9c50de83ddbf42a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Jul 31 16:31:47 2019 +0100
_message.py: Use element_name & element_key instead of unique_id
Adding the element full name and display key into all element related
messages removes the need to look up the plugintable via a plugin
unique_id just to retrieve the same values for logging and widget
frontend display. Relying on plugintable state is also incompatible
if the frontend will be running in a different process, as it will
exist in multiple states. An Element instance is passed exclusively
if handling interactive failures.
The element full name is now displayed instead of the unique_id,
such as in the debugging widget. It is also displayed in place of
'name' (i.e including any junction prepend) to be more informative.
---
src/buildstream/_frontend/app.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index e1b1d8d..76e3bc7 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -586,6 +586,7 @@ class App():
"unable to retrieve failure message for element {}\n\n\n\n\n"
.format(full_name), err=True)
else:
+ # Note element will be given if in interactive failure mode
self._handle_failure(element, queue, failure)
else:
[buildstream] 09/19: Psuedocode for subprocess context manager
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e0561a73703dfbb0ec6c82bbdafc63ccea2997ad
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 11:12:30 2019 +0100
Psuedocode for subprocess context manager
---
src/buildstream/_stream.py | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2d451c6..4a1edcf 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1609,3 +1609,17 @@ class Stream():
# a new use-case arises.
#
raise TypeError("Stream objects should not be pickled.")
+
+ # TODO
+ # Causes the decorated method to be run in a subprocess
+ @contextmanager
+ def subprocessed(self, func, *args, **kwargs):
+ pass
+ # Set up event loop
+
+ # Start subprocessed work
+
+ # Run event loop. This event loop should exit once the
+ # subprocessed work has completed
+
+ # Return result of subprocessed function
[buildstream] 07/19: Refer to stream-scheduler communication as
notifications
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ed0eabe3ccac556f5ebe3783cb8538bba5ab0f16
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 13:02:15 2019 +0100
Refer to stream-scheduler communication as notifications
---
src/buildstream/_scheduler/scheduler.py | 38 ++++++++++++++++-----------------
1 file changed, 19 insertions(+), 19 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 5bf0764..9e120e4 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -98,7 +98,7 @@ class Notification:
class Scheduler():
def __init__(self, context,
- start_time, state, message_handler,
+ start_time, state, notification_handler,
interrupt_callback=None,
ticker_callback=None,
interactive_failure=False):
@@ -131,8 +131,8 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callback to send messages to report back to the Scheduler's owner
- self.message = message_handler
+ # Callback to send notifications to report back to the Scheduler's owner
+ self.notify = notification_handler
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -299,13 +299,13 @@ class Scheduler():
# this may change if the frontend is run in a separate process for pickling
element = job._element if (job.element_job and self._interactive_failure) else None
- message = Notification(NotificationType.JOB_COMPLETE,
- full_name=job.name,
- job_action=job.action_name,
- job_status=status,
- failed_element=job.element_job,
- element=element)
- self.message(message)
+ notification = Notification(NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ failed_element=job.element_job,
+ element=element)
+ self.notify(notification)
if process_jobs:
# Now check for more jobs
@@ -365,11 +365,11 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- message = Notification(NotificationType.JOB_START,
- full_name=job.name,
- job_action=job.action_name,
- elapsed_time=self.elapsed_time())
- self.message(message)
+ notification = Notification(NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ elapsed_time=self.elapsed_time())
+ self.notify(notification)
job.start()
# Callback for the cache size job
@@ -588,8 +588,8 @@ class Scheduler():
if self.terminated:
return
- message = Notification(NotificationType.INTERRUPT)
- self.message(message)
+ notification = Notification(NotificationType.INTERRUPT)
+ self.notify(notification)
# _terminate_event():
#
@@ -648,8 +648,8 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
- message = Notification(NotificationType.TICK)
- self.message(message)
+ notification = Notification(NotificationType.TICK)
+ self.notify(notification)
self.loop.call_later(1, self._tick)
def __getstate__(self):
[buildstream] 11/19: WIP: Add a way to run stream methods in a
subprocess
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit d12e62d7aeb8595133b0c732a4b0038ae2479a24
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jul 4 09:55:51 2019 +0100
WIP: Add a way to run stream methods in a subprocess
Currently the frontend is getting stuck in an infite loop.
Also contains debugging print statements which will need to be
stripped out.
---
src/buildstream/_frontend/app.py | 153 ++++++++++++++++----------------
src/buildstream/_scheduler/scheduler.py | 6 +-
src/buildstream/_stream.py | 67 ++++++++++----
3 files changed, 129 insertions(+), 97 deletions(-)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 9f90938..90070af 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -596,84 +596,83 @@ class App():
def _handle_failure(self, element, queue, failure):
- # Handle non interactive mode setting of what to do when a job fails.
- if not self._interactive_failures:
+ # # Handle non interactive mode setting of what to do when a job fails.
+ # if not self._interactive_failures:
- if self.context.sched_error_action == _SchedulerErrorAction.TERMINATE:
- self.stream.terminate()
- elif self.context.sched_error_action == _SchedulerErrorAction.QUIT:
- self.stream.quit()
- elif self.context.sched_error_action == _SchedulerErrorAction.CONTINUE:
- pass
- return
-
- assert False
- # Interactive mode for element failures
- with self._interrupted():
-
- summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
- "\n" +
- "Choose one of the following options:\n" +
- " (c)ontinue - Continue queueing jobs as much as possible\n" +
- " (q)uit - Exit after all ongoing jobs complete\n" +
- " (t)erminate - Terminate any ongoing jobs and exit\n" +
- " (r)etry - Retry this job\n")
- if failure.logfile:
- summary += " (l)og - View the full log file\n"
- if failure.sandbox:
- summary += " (s)hell - Drop into a shell in the failed build sandbox\n"
- summary += "\nPressing ^C will terminate jobs and exit\n"
-
- choices = ['continue', 'quit', 'terminate', 'retry']
- if failure.logfile:
- choices += ['log']
- if failure.sandbox:
- choices += ['shell']
-
- choice = ''
- while choice not in ['continue', 'quit', 'terminate', 'retry']:
- click.echo(summary, err=True)
-
- self._notify("BuildStream failure", "{} on element {}"
- .format(failure.action_name, element.name))
-
- try:
- choice = click.prompt("Choice:", default='continue', err=True,
- value_proc=_prefix_choice_value_proc(choices))
- except click.Abort:
- # Ensure a newline after automatically printed '^C'
- click.echo("", err=True)
- choice = 'terminate'
-
- # Handle choices which you can come back from
- #
- assert choice != 'shell' # This won't work for now
- if choice == 'shell':
- click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
- try:
- prompt = self.shell_prompt(element)
- self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
- except BstError as e:
- click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
- elif choice == 'log':
- with open(failure.logfile, 'r') as logfile:
- content = logfile.read()
- click.echo_via_pager(content)
-
- if choice == 'terminate':
- click.echo("\nTerminating all jobs\n", err=True)
- self.stream.terminate()
- else:
- if choice == 'quit':
- click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.stream.quit()
- elif choice == 'continue':
- click.echo("\nContinuing with other non failing elements\n", err=True)
- elif choice == 'retry':
- click.echo("\nRetrying failed job\n", err=True)
- # FIXME: Outstandingly nasty modification of core state
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
+ if self.context.sched_error_action == 'terminate':
+ self.stream.terminate()
+ elif self.context.sched_error_action == 'quit':
+ self.stream.quit()
+ elif self.context.sched_error_action == 'continue':
+ pass
+ return
+
+ # assert False
+ # # Interactive mode for element failures
+ # with self._interrupted():
+
+ # summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
+ # "\n" +
+ # "Choose one of the following options:\n" +
+ # " (c)ontinue - Continue queueing jobs as much as possible\n" +
+ # " (q)uit - Exit after all ongoing jobs complete\n" +
+ # " (t)erminate - Terminate any ongoing jobs and exit\n" +
+ # " (r)etry - Retry this job\n")
+ # if failure.logfile:
+ # summary += " (l)og - View the full log file\n"
+ # if failure.sandbox:
+ # summary += " (s)hell - Drop into a shell in the failed build sandbox\n"
+ # summary += "\nPressing ^C will terminate jobs and exit\n"
+
+ # choices = ['continue', 'quit', 'terminate', 'retry']
+ # if failure.logfile:
+ # choices += ['log']
+ # if failure.sandbox:
+ # choices += ['shell']
+
+ # choice = ''
+ # while choice not in ['continue', 'quit', 'terminate', 'retry']:
+ # click.echo(summary, err=True)
+
+ # self._notify("BuildStream failure", "{} on element {}"
+ # .format(failure.action_name, element.name))
+
+ # try:
+ # choice = click.prompt("Choice:", default='continue', err=True,
+ # value_proc=_prefix_choice_value_proc(choices))
+ # except click.Abort:
+ # # Ensure a newline after automatically printed '^C'
+ # click.echo("", err=True)
+ # choice = 'terminate'
+
+ # # Handle choices which you can come back from
+ # #
+ # assert choice != 'shell' # This won't work for now
+ # if choice == 'shell':
+ # click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
+ # try:
+ # prompt = self.shell_prompt(element)
+ # self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
+ # except BstError as e:
+ # click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
+ # elif choice == 'log':
+ # with open(failure.logfile, 'r') as logfile:
+ # content = logfile.read()
+ # click.echo_via_pager(content)
+
+ # if choice == 'terminate':
+ # click.echo("\nTerminating all jobs\n", err=True)
+ # self.stream.terminate()
+ # else:
+ # if choice == 'quit':
+ # click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
+ # self.stream.quit()
+ # elif choice == 'continue':
+ # click.echo("\nContinuing with other non failing elements\n", err=True)
+ # elif choice == 'retry':
+ # click.echo("\nRetrying failed job\n", err=True)
+ # queue.failed_elements.remove(element)
+ # queue.enqueue([element])
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 14ecf30..2e9c740 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -52,6 +52,7 @@ class NotificationType(enum.Enum):
JOB_START = "job_start"
JOB_COMPLETE = "job_complete"
TICK = "tick"
+ EXCEPTION = "exception"
class Notification:
@@ -64,7 +65,9 @@ class Notification:
job_status=None,
failed_element=False,
elapsed_time=None,
- element=None):
+ element=None,
+ exception=None):
+
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
@@ -72,6 +75,7 @@ class Notification:
self.failed_element = failed_element
self.elapsed_time = elapsed_time
self.element = element
+ self.exception = exception
# Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e8b3931..d2a4d08 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,7 @@
# Jürg Billeter <ju...@codethink.co.uk>
# Tristan Maat <tr...@codethink.co.uk>
+import asyncio
import itertools
import functools
import multiprocessing as mp
@@ -31,6 +32,7 @@ import tarfile
import tempfile
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -46,6 +48,40 @@ from . import utils, _yaml, _site
from . import Scope, Consistency
+# A decorator which runs the decorated method to be run in a subprocess
+def subprocessed(func):
+
+ @functools.wraps(func)
+ def _subprocessed(self, *args, **kwargs):
+ assert self
+ print("Args: {}".format([*args]))
+ print("Kwargs: {}".format(list(kwargs.items())))
+ assert not self._subprocess
+
+ # TODO use functools to pass arguments to func to make target for subprocess
+
+ # Start subprocessed work
+ mp_context = mp.get_context(method='spawn')
+ process_name = "stream-{}".format(func.__name__)
+ target = functools.partial(func, self, *args, **kwargs)
+ print("launching subprocess:", process_name)
+ self._subprocess = mp_context.Process(target=target, name=process_name)
+ self._subprocess.run()
+
+ # TODO connect signal handlers
+
+ # Run event loop. This event loop should exit once the
+ # subprocessed work has completed
+ print("Starting loop...")
+ while not self._subprocess.exitcode:
+ self._loop()
+ print("Stopping loop...")
+
+ # Return result of subprocessed function
+
+ return _subprocessed
+
+
# Stream()
#
# This is the main, toplevel calling interface in BuildStream core.
@@ -77,6 +113,7 @@ class Stream():
#
# Private members
#
+ self._subprocess = None
self._notification_queue = mp.Queue()
self._context = context
self._artifacts = None
@@ -245,6 +282,7 @@ class Stream():
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
+ @subprocessed
def build(self, targets, *,
track_targets=None,
track_except=None,
@@ -1592,6 +1630,9 @@ class Stream():
else:
unique_id = None
self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ elif notification.notification_type == NotificationType.EXCEPTION:
+ # TODO
+ pass
else:
raise StreamError("Unreccognised notification type recieved")
@@ -1610,31 +1651,19 @@ class Stream():
#
raise TypeError("Stream objects should not be pickled.")
- # TODO
- # Causes the decorated method to be run in a subprocess
- @contextmanager
- def subprocessed(self, func, *args, **kwargs):
- pass
- # Set up event loop
-
- # Start subprocessed work
-
- # Run event loop. This event loop should exit once the
- # subprocessed work has completed
-
- # Return result of subprocessed function
-
# The code to be run by the Stream's event loop while delegating
- # work to a subprocess with the @subprocessed
+ # work to a subprocess with the @subprocessed decorator
def _loop(self):
assert self._notification_queue
- # Check that the subprocessed work has not finished
- # TODO
# Check for new messages
- notification = self._notification_queue.get(block=True, timeout=0.1)
+ try:
+ notification = self._notification_queue.get(block=True, timeout=0.1)
+ except queue.Empty:
+ notification = None
+ print("queue empty, continuing...")
# Process new messages
if notification:
+ print("handling notifications")
self._scheduler_notification_handler(notification)
-
[buildstream] 01/19: element.py: Remove redundant second call to
_get_cache_key()
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 456da5723d447789260a7c970dcfca736567e240
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Jul 31 15:39:15 2019 +0100
element.py: Remove redundant second call to _get_cache_key()
---
src/buildstream/element.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 21c38bc..a4404fc 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1242,7 +1242,7 @@ class Element(Plugin):
if not cache_key:
cache_key = "{:?<64}".format('')
- elif self._get_cache_key() == self.__strict_cache_key:
+ elif cache_key == self.__strict_cache_key:
# Strong cache key used in this session matches cache key
# that would be used in strict build mode
dim_key = False
[buildstream] 19/19: Add workarounds for queue querying in main
process
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit dc863015320f1ff9b749a678a2a1ce4b83c006e0
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jul 16 16:48:38 2019 +0100
Add workarounds for queue querying in main process
---
src/buildstream/_frontend/app.py | 3 ++-
src/buildstream/_scheduler/scheduler.py | 29 ++++++++++++++++++-----------
src/buildstream/_stream.py | 13 ++++++-------
3 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 90070af..7fd10b4 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -570,8 +570,9 @@ class App():
if not self.stream.terminated:
if element_job:
# look-up queue
+ # Issue with pickling a queue object, so for now only pass action names
for q in self.stream.queues:
- if q.action_name == action_name:
+ if q == action_name:
queue = q
assert queue, "Job action {} does not have a corresponding queue".format(action_name)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6649865..44ebeef 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
+import queue
# Local imports
from .resources import Resources, ResourceType
@@ -55,6 +56,7 @@ class NotificationType(enum.Enum):
EXCEPTION = "exception"
TASK_ERROR = "task_error"
SCHED_TERMINATE = "sched_terminate"
+ QUEUES = "queues"
class Notification:
@@ -70,7 +72,8 @@ class Notification:
element=None,
exception=None,
domain=None,
- reason=None):
+ reason=None,
+ queues=None):
self.notification_type = notification_type
self.full_name = full_name
@@ -82,6 +85,7 @@ class Notification:
self.exception = exception
self.domain = domain
self.reason = reason
+ self.queues = queues
# Scheduler()
@@ -175,6 +179,14 @@ class Scheduler():
# Hold on to the queues to process
self.queues = queues
+ # Report to the main process which queues are in session,
+ # for now a list of action_names as pickling queues is
+ # causing errors. Will need actual queue object or bidirectional
+ # notification queue for error handling later.
+ queue_list = [q.action_name for q in self.queues]
+ notifcation = Notification(NotificationType.QUEUES, queues=queue_list)
+ self._notify(notifcation)
+
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
self.loop = asyncio.new_event_loop()
@@ -294,20 +306,17 @@ class Scheduler():
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# status (JobStatus): The status of the completed job
- # process_jobs (bool): If the scheduler should also process the
- # job, else just generate the notification
#
- def job_completed(self, job, status, process_jobs=True):
+ def job_completed(self, job, status):
- if process_jobs:
- # Remove from the active jobs list
- self._active_jobs.remove(job)
+ self._active_jobs.remove(job)
+ element = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance if interactive failure handling. Note
# this may change if the frontend is run in a separate process for pickling
- element = job._element if (job.element_job and self._interactive_failure) else None
+ element = job._element if (job.element_job and self._interactive_failure) else element
notification = Notification(NotificationType.JOB_COMPLETE,
full_name=job.name,
@@ -317,9 +326,7 @@ class Scheduler():
element=element)
self._notify(notification)
- if process_jobs:
- # Now check for more jobs
- self._sched()
+ self._sched()
# check_cache_size():
#
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4de975e..e0f0842 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,7 +38,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus, Notification
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -114,7 +114,6 @@ class Stream():
try:
func(*args, **kwargs)
except Exception as e:
- from ._scheduler.scheduler import Notification, NotificationType
queue.put(Notification(NotificationType.EXCEPTION, exception=e))
def run_in_subprocess(self, func, *args, **kwargs):
@@ -367,6 +366,7 @@ class Stream():
if track_elements:
self._enqueue_plan(track_elements, queue=track_queue)
self._enqueue_plan(elements)
+
self._run()
# fetch()
@@ -1646,15 +1646,14 @@ class Stream():
elif notification.notification_type == NotificationType.JOB_COMPLETE:
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
- if notification.failed_element:
- unique_id = notification.full_name
- else:
- unique_id = None
- self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ self._state.fail_task(notification.job_action, notification.full_name,
+ notification.failed_element, notification.element)
elif notification.notification_type == NotificationType.EXCEPTION:
raise notification.exception
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(notification.domain, notification.reason)
+ elif notification.notification_type == NotificationType.QUEUES:
+ self.queues = notification.queues
else:
raise StreamError("Unreccognised notification type recieved")
[buildstream] 08/19: Send scheduler notifications over a
multiprocessing queue
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e39c94064de749d31bf41987fccaa19fc129a975
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 10:18:49 2019 +0100
Send scheduler notifications over a multiprocessing queue
---
src/buildstream/_scheduler/scheduler.py | 17 ++++++++++-------
src/buildstream/_stream.py | 4 +++-
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9e120e4..14ecf30 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -98,7 +98,7 @@ class Notification:
class Scheduler():
def __init__(self, context,
- start_time, state, notification_handler,
+ start_time, state, notification_queue,
interrupt_callback=None,
ticker_callback=None,
interactive_failure=False):
@@ -131,8 +131,8 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callback to send notifications to report back to the Scheduler's owner
- self.notify = notification_handler
+ # Message to send notifications back to the Scheduler's owner
+ self._notification_queue = notification_queue
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -305,7 +305,7 @@ class Scheduler():
job_status=status,
failed_element=job.element_job,
element=element)
- self.notify(notification)
+ self._notify(notification)
if process_jobs:
# Now check for more jobs
@@ -369,7 +369,7 @@ class Scheduler():
full_name=job.name,
job_action=job.action_name,
elapsed_time=self.elapsed_time())
- self.notify(notification)
+ self._notify(notification)
job.start()
# Callback for the cache size job
@@ -589,7 +589,7 @@ class Scheduler():
return
notification = Notification(NotificationType.INTERRUPT)
- self.notify(notification)
+ self._notify(notification)
# _terminate_event():
#
@@ -649,9 +649,12 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
notification = Notification(NotificationType.TICK)
- self.notify(notification)
+ self._notify(notification)
self.loop.call_later(1, self._tick)
+ def _notify(self, notification):
+ self._notification_queue.put(notification)
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 45bb41b..2d451c6 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -21,6 +21,7 @@
import itertools
import functools
+import multiprocessing as mp
import os
import sys
import stat
@@ -76,6 +77,7 @@ class Stream():
#
# Private members
#
+ self._notification_queue = mp.Queue()
self._context = context
self._artifacts = None
self._sourcecache = None
@@ -85,7 +87,7 @@ class Stream():
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback,
interactive_failure=interactive_failure)
[buildstream] 05/19: WIP: Refactor scheduler-frontend communication
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit b7b445cdb0de0223015c71506d8689135151c966
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 13 17:47:04 2019 +0100
WIP: Refactor scheduler-frontend communication
---
src/buildstream/_frontend/app.py | 2 +
src/buildstream/_scheduler/__init__.py | 2 +-
src/buildstream/_scheduler/scheduler.py | 76 +++++++++++++++++++++++++--------
src/buildstream/_stream.py | 25 ++++++++++-
4 files changed, 85 insertions(+), 20 deletions(-)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 76e3bc7..9f90938 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -607,6 +607,7 @@ class App():
pass
return
+ assert False
# Interactive mode for element failures
with self._interrupted():
@@ -646,6 +647,7 @@ class App():
# Handle choices which you can come back from
#
+ assert choice != 'shell' # This won't work for now
if choice == 'shell':
click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
try:
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458f..d689d6e 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 4f668c6..a9286d4 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -21,6 +21,7 @@
# System imports
import os
import asyncio
+import enum
from itertools import chain
import signal
import datetime
@@ -45,6 +46,34 @@ _ACTION_NAME_CLEANUP = 'clean'
_ACTION_NAME_CACHE_SIZE = 'size'
+@enum.unique
+class NotificationType(enum.Enum):
+ INTERRUPT = "interrupt"
+ JOB_START = "job_start"
+ JOB_COMPLETE = "job_complete"
+ TICK = "tick"
+
+
+class Notification:
+
+ def __init__(self,
+ notification_type,
+ *,
+ full_name=None,
+ job_action=None,
+ job_status=None,
+ failed_element=False,
+ elapsed_time=None,
+ element=None):
+ self.notification_type = notification_type
+ self.full_name = full_name
+ self.job_action = job_action
+ self.job_status = job_status
+ self.failed_element = failed_element
+ self.elapsed_time = elapsed_time
+ self.element = element
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -69,7 +98,7 @@ _ACTION_NAME_CACHE_SIZE = 'size'
class Scheduler():
def __init__(self, context,
- start_time, state,
+ start_time, state, message_handler,
interrupt_callback=None,
ticker_callback=None,
interactive_failure=False):
@@ -102,9 +131,17 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callbacks to report back to the Scheduler owner
- self._interrupt_callback = interrupt_callback
- self._ticker_callback = ticker_callback
+ # Callback to send messages to report back to the Scheduler's owner
+ self.message = message_handler
+
+ # Whether our exclusive jobs, like 'cleanup' are currently already
+ # waiting or active.
+ #
+ # This is just a bit quicker than scanning the wait queue and active
+ # queue and comparing job action names.
+ #
+ self._exclusive_waiting = set()
+ self._exclusive_active = set()
self.resources = Resources(context.sched_builders,
context.sched_fetchers,
@@ -134,8 +171,7 @@ class Scheduler():
asyncio.set_event_loop(self.loop)
# Add timeouts
- if self._ticker_callback:
- self.loop.call_later(1, self._tick)
+ self.loop.call_later(1, self._tick)
# Handle unix signals while running
self._connect_signals()
@@ -254,13 +290,19 @@ class Scheduler():
# Remove from the active jobs list
self._active_jobs.remove(job)
- self._state.remove_task(job.action_name, job.name)
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance if interactive failure handling. Note
# this may change if the frontend is run in a separate process for pickling
element = job._element if (job.element_job and self._interactive_failure) else None
- self._state.fail_task(job.action_name, job.name, element_job=job.element_job, element=element)
+
+ message = Notification(NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ failed_element=job.element_job,
+ element=element)
+ self.message(message)
# Now check for more jobs
self._sched()
@@ -319,7 +361,11 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- self._state.add_task(job.action_name, job.name, self.elapsed_time())
+ message = Notification(NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ elapsed_time=self.elapsed_time())
+ self.message(message)
job.start()
# Callback for the cache size job
@@ -538,13 +584,8 @@ class Scheduler():
if self.terminated:
return
- # Leave this to the frontend to decide, if no
- # interrrupt callback was specified, then just terminate.
- if self._interrupt_callback:
- self._interrupt_callback()
- else:
- # Default without a frontend is just terminate
- self.terminate_jobs()
+ message = Notification(NotificationType.INTERRUPT)
+ self.message(message)
# _terminate_event():
#
@@ -603,7 +644,8 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
- self._ticker_callback()
+ message = Notification(NotificationType.TICK)
+ self.message(message)
self.loop.call_later(1, self._tick)
def __getstate__(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 5f12889..45bb41b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -85,12 +85,14 @@ class Stream():
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state,
+ self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback,
interactive_failure=interactive_failure)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
# init()
#
@@ -1572,6 +1574,25 @@ class Stream():
return element_targets, artifact_refs
+ def _scheduler_notification_handler(self, notification):
+ if notification.notification_type == NotificationType.INTERRUPT:
+ self._interrupt_callback()
+ elif notification.notification_type == NotificationType.TICK:
+ self._ticker_callback()
+ elif notification.notification_type == NotificationType.JOB_START:
+ self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time)
+
+ elif notification.notification_type == NotificationType.JOB_COMPLETE:
+ self._state.remove_task(notification.job_action, notification.full_name)
+ if notification.job_status == JobStatus.FAIL:
+ if notification.failed_element:
+ unique_id = notification.full_name
+ else:
+ unique_id = None
+ self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ else:
+ raise StreamError("Unreccognised notification type recieved")
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
[buildstream] 12/19: Change subprocess.run to start
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit be6e68ff519c3185252149a98d59f42dc72925bd
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jul 4 14:44:29 2019 +0100
Change subprocess.run to start
---
src/buildstream/_stream.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d2a4d08..0fe2234 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -66,7 +66,7 @@ def subprocessed(func):
target = functools.partial(func, self, *args, **kwargs)
print("launching subprocess:", process_name)
self._subprocess = mp_context.Process(target=target, name=process_name)
- self._subprocess.run()
+ self._subprocess.start()
# TODO connect signal handlers
[buildstream] 16/19: fixup! fixup! fixup! Move subprocess machinery
into a method
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 9b491a9e2f7ac60da0e0257841f44545475a194c
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 15:51:54 2019 +0100
fixup! fixup! fixup! Move subprocess machinery into a method
---
src/buildstream/_stream.py | 2 --
1 file changed, 2 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index ed83747..89d6539 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -114,9 +114,7 @@ class Stream():
global notification_count
notification_count = 0
- # TODO use functools to pass arguments to func to make target for subprocess
- # Start subprocessed work
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
print("launchinglaunching subprocess:", process_name)
[buildstream] 06/19: Add workaround for buildqueue job_complete
direct callback
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit af619ea27e6da79d809f436c93a5ba8faa7f7738
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 12:34:22 2019 +0100
Add workaround for buildqueue job_complete direct callback
---
src/buildstream/_scheduler/scheduler.py | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index a9286d4..5bf0764 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -284,11 +284,14 @@ class Scheduler():
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# status (JobStatus): The status of the completed job
+ # process_jobs (bool): If the scheduler should also process the
+ # job, else just generate the notification
#
- def job_completed(self, job, status):
+ def job_completed(self, job, status, process_jobs=True):
- # Remove from the active jobs list
- self._active_jobs.remove(job)
+ if process_jobs:
+ # Remove from the active jobs list
+ self._active_jobs.remove(job)
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
@@ -304,8 +307,9 @@ class Scheduler():
element=element)
self.message(message)
- # Now check for more jobs
- self._sched()
+ if process_jobs:
+ # Now check for more jobs
+ self._sched()
# check_cache_size():
#
[buildstream] 02/19: plugin.py: cache full_name member in __init__
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c5db3d822a2433b97ee52e9108da135edf4ba13e
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Aug 6 13:15:58 2019 +0100
plugin.py: cache full_name member in __init__
Once project & type are resolved, the full_name can be computed
and cached for efficiency. The accessor for getting the private
member should also be moved to the correct section.
---
src/buildstream/plugin.py | 29 ++++++++++++++++++++++-------
1 file changed, 22 insertions(+), 7 deletions(-)
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index d963916..506eba5 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -246,6 +246,9 @@ class Plugin():
self.__type_tag = type_tag # The type of plugin (element or source)
self.__configuring = False # Whether we are currently configuring
+ # Set the full_name as project & type_tag are resolved
+ self.__full_name = self.__set_full_name()
+
# Infer the kind identifier
modulename = type(self).__module__
self.__kind = modulename.split('.')[-1]
@@ -672,6 +675,18 @@ class Plugin():
def _preflight(self):
self.preflight()
+ # _get_full_name():
+ #
+ # The instance full name of the plugin prepended with the owning
+ # junction if appropriate. This being the name of the given element,
+ # as appose to the class name of the underlying plugin __kind identifier.
+ #
+ # Returns:
+ # (str): element full name, with prepended owning junction if appropriate
+ #
+ def _get_full_name(self):
+ return self.__full_name
+
# _get_args_for_child_job_pickling(self)
#
# Return data necessary to reconstruct this object in a child job process.
@@ -728,13 +743,6 @@ class Plugin():
output.flush()
self.status('Running host command', detail=command)
- def _get_full_name(self):
- project = self.__project
- if project.junction:
- return '{}:{}'.format(project.junction.name, self.name)
- else:
- return self.name
-
def __deprecation_warning_silenced(self):
if not self.BST_PLUGIN_DEPRECATED:
return False
@@ -751,6 +759,13 @@ class Plugin():
return self.get_kind() in silenced_warnings
+ def __set_full_name(self):
+ project = self.__project
+ if project.junction:
+ return '{}:{}'.format(project.junction.name, self.name)
+ else:
+ return self.name
+
# A local table for _prefix_warning()
#
[buildstream] 15/19: fixup! fixup! Move subprocess machinery into a
method
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit d851dec4e2166291a184cf85fdc474ce1a64448f
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 15:45:00 2019 +0100
fixup! fixup! Move subprocess machinery into a method
---
src/buildstream/_stream.py | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 1de8364..ed83747 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -106,8 +106,6 @@ class Stream():
def init(self):
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
- print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__,
- id(Stream.build))
def run_in_subprocess(self, func, *args, **kwargs):
print("Args: {}".format([*args]))
@@ -126,7 +124,7 @@ class Stream():
self._subprocess.start()
# TODO connect signal handlers
- while self._subprocess.exitcode is not None:
+ while self._subprocess.exitcode is not:
self._subprocess.join(0.1)
self._loop()
print("Stopping loop...")
[buildstream] 17/19: Some tweaks
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c2dafd2dc11af73838ae19e337f94687244f60cc
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Jul 8 12:34:05 2019 +0100
Some tweaks
---
src/buildstream/_stream.py | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 89d6539..afbeb20 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -112,9 +112,6 @@ class Stream():
print("Kwargs: {}".format(list(kwargs.items())))
assert not self._subprocess
- global notification_count
- notification_count = 0
-
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
print("launchinglaunching subprocess:", process_name)
@@ -122,8 +119,10 @@ class Stream():
self._subprocess.start()
# TODO connect signal handlers
- while self._subprocess.exitcode is not:
+ while self._subprocess.exitcode is None:
+ # check every given time interval on subprocess state
self._subprocess.join(0.1)
+ # if no exit code, go back to checking the message queue
self._loop()
print("Stopping loop...")
[buildstream] 10/19: stream.py: Minimal implementation of event
loop function
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 48847cdffdee48e8698011be6bdc1660eb08ae14
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 11:12:57 2019 +0100
stream.py: Minimal implementation of event loop function
---
src/buildstream/_stream.py | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4a1edcf..e8b3931 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1623,3 +1623,18 @@ class Stream():
# subprocessed work has completed
# Return result of subprocessed function
+
+ # The code to be run by the Stream's event loop while delegating
+ # work to a subprocess with the @subprocessed
+ def _loop(self):
+ assert self._notification_queue
+ # Check that the subprocessed work has not finished
+ # TODO
+
+ # Check for new messages
+ notification = self._notification_queue.get(block=True, timeout=0.1)
+
+ # Process new messages
+ if notification:
+ self._scheduler_notification_handler(notification)
+
[buildstream] 18/19: Support exception handling from the subprocess
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 07bb725bbf4817a4982f3eaeb5a4268bade3e9c7
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Jul 15 11:51:18 2019 +0100
Support exception handling from the subprocess
---
src/buildstream/_exceptions.py | 3 +-
src/buildstream/_scheduler/jobs/job.py | 2 +
src/buildstream/_scheduler/queues/queue.py | 1 +
src/buildstream/_scheduler/scheduler.py | 27 +++++++++++++-
src/buildstream/_stream.py | 59 ++++++++++++++++++++++--------
src/buildstream/testing/runcli.py | 2 +-
src/buildstream/utils.py | 4 ++
7 files changed, 80 insertions(+), 18 deletions(-)
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index d5b87a8..e977660 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -238,7 +238,8 @@ class LoadErrorReason(Enum):
# interpreting project YAML
#
class LoadError(BstError):
- def __init__(self, message, reason, *, detail=None):
+ def __init__(self, message, reason=None, *, detail=None):
+ # Second parameter needs to be a default arg due to unpickling issue, unpleasant.
super().__init__(message, detail=detail, domain=ErrorDomain.LOAD, reason=reason)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index d80d1a9..9e3ca13 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -488,6 +488,8 @@ class Job():
# is currently managed in _exceptions.py
set_last_task_error(envelope.message['domain'],
envelope.message['reason'])
+ self._scheduler.set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
elif envelope.message_type is _MessageType.RESULT:
assert self._result is None
self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 8c81ce2..3435e3f 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -316,6 +316,7 @@ class Queue():
# This just allows us stronger testing capability
#
set_last_task_error(e.domain, e.reason)
+ self._scheduler.set_last_task_error(e.domain, e.reason)
except Exception: # pylint: disable=broad-except
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2e9c740..6649865 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -53,6 +53,8 @@ class NotificationType(enum.Enum):
JOB_COMPLETE = "job_complete"
TICK = "tick"
EXCEPTION = "exception"
+ TASK_ERROR = "task_error"
+ SCHED_TERMINATE = "sched_terminate"
class Notification:
@@ -66,7 +68,9 @@ class Notification:
failed_element=False,
elapsed_time=None,
element=None,
- exception=None):
+ exception=None,
+ domain=None,
+ reason=None):
self.notification_type = notification_type
self.full_name = full_name
@@ -76,6 +80,8 @@ class Notification:
self.elapsed_time = elapsed_time
self.element = element
self.exception = exception
+ self.domain = domain
+ self.reason = reason
# Scheduler()
@@ -331,6 +337,12 @@ class Scheduler():
#
self._cache_size_scheduled = True
+ def set_last_task_error(self, domain, reason):
+ notification = Notification(NotificationType.TASK_ERROR,
+ domain=domain,
+ reason=reason)
+ self._notify(notification)
+
#######################################################
# Local Private Methods #
#######################################################
@@ -673,3 +685,16 @@ class Scheduler():
# a new use-case arises.
#
raise TypeError("Scheduler objects should not be pickled.")
+
+ def _loop(self):
+ assert self._notification_queue
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notification_queue.get_nowait()
+ if notification.notification_type == NotificationType.SCHED_TERMINATE:
+ print("handling notifications")
+ self.terminate_jobs()
+ except queue.Empty:
+ notification = None
+ break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index afbeb20..4de975e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -35,7 +35,7 @@ import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
@@ -107,6 +107,16 @@ class Stream():
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
+ @staticmethod
+ def _subprocess_main(func, queue, *args, **kwargs):
+ # Set main process
+ utils._reset_main_pid()
+ try:
+ func(*args, **kwargs)
+ except Exception as e:
+ from ._scheduler.scheduler import Notification, NotificationType
+ queue.put(Notification(NotificationType.EXCEPTION, exception=e))
+
def run_in_subprocess(self, func, *args, **kwargs):
print("Args: {}".format([*args]))
print("Kwargs: {}".format(list(kwargs.items())))
@@ -114,11 +124,16 @@ class Stream():
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
- print("launchinglaunching subprocess:", process_name)
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+ args = list(args)
+ args.insert(0, self._notification_queue)
+ args.insert(0, func)
+ print("launching subprocess:", process_name)
+ self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
+ kwargs=kwargs, name=process_name)
+
self._subprocess.start()
- # TODO connect signal handlers
+ # TODO connect signal handlers with asyncio
while self._subprocess.exitcode is None:
# check every given time interval on subprocess state
self._subprocess.join(0.1)
@@ -126,13 +141,17 @@ class Stream():
self._loop()
print("Stopping loop...")
- # try:
- # while True:
- # notification = self._notification_queue.get_nowait()
- # self._scheduler_notification_handler(notification)
- # except queue.Empty:
- # print("Finished processing notifications")
- # pass
+ # Set main process back
+ utils._reset_main_pid()
+
+ # Ensure no more notifcations to process
+ try:
+ while True:
+ notification = self._notification_queue.get_nowait()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ print("Finished processing notifications")
+ pass
# cleanup()
#
@@ -1086,7 +1105,15 @@ class Stream():
# Terminate jobs
#
def terminate(self):
+ #if self._scheduler.loop:
+ # Scheduler not in subprocess
self._scheduler.terminate_jobs()
+ #else:
+ # # Handle calling subprocessed scheduler outside of main process
+ # assert self._notification_queue
+ # from ._scheduler.scheduler import Notification, NotificationType
+ # self._notifiaction_queue.put(Notification(NotificationType.SCHED_TERMINATE))
+ # self._scheduler.terminate_jobs()
# quit()
#
@@ -1625,8 +1652,9 @@ class Stream():
unique_id = None
self._state.fail_task(notification.job_action, notification.full_name, unique_id)
elif notification.notification_type == NotificationType.EXCEPTION:
- # TODO
- pass
+ raise notification.exception
+ elif notification.notification_type == NotificationType.TASK_ERROR:
+ set_last_task_error(notification.domain, notification.reason)
else:
raise StreamError("Unreccognised notification type recieved")
@@ -1653,8 +1681,9 @@ class Stream():
while True:
try:
notification = self._notification_queue.get_nowait()
- print("handling notifications")
- self._scheduler_notification_handler(notification)
+ if notification.notification_type != NotificationType.SCHED_TERMINATE:
+ print("handling notifications")
+ self._scheduler_notification_handler(notification)
except queue.Empty:
notification = None
break
diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py
index 95bf83e..ad71705 100644
--- a/src/buildstream/testing/runcli.py
+++ b/src/buildstream/testing/runcli.py
@@ -85,7 +85,6 @@ class Result():
# in the case that the exit code reported is 0 (success).
#
if self.exit_code != 0:
-
# Check if buildstream failed to handle an
# exception, topevel CLI exit should always
# be a SystemExit exception.
@@ -149,6 +148,7 @@ class Result():
self.exception.domain,
self.exception.reason
))
+
assert self.exit_code == -1, fail_message
assert self.exc is not None, fail_message
assert self.exception is not None, fail_message
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 2c57925..ed1e123 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -711,6 +711,10 @@ def _is_main_process():
assert _main_pid is not None
return os.getpid() == _main_pid
+def _reset_main_pid():
+ global _main_pid
+ _main_pid = os.getpid()
+
# Recursively remove directories, ignoring file permissions as much as
# possible.