You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:30:54 UTC

[buildstream] branch phil/ui-split-refactor created (now dc86301)

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a change to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at dc86301  Add workarounds for queue querying in main process

This branch includes the following new commits:

     new 456da57  element.py: Remove redundant second call to _get_cache_key()
     new c5db3d8  plugin.py: cache full_name member in __init__
     new df095d5  _message.py: Use element_name & element_key instead of unique_id
     new b9bcb00  _message.py: Use element_name & element_key instead of unique_id
     new b7b445c  WIP: Refactor scheduler-frontend communication
     new af619ea  Add workaround for buildqueue job_complete direct callback
     new ed0eabe  Refer to stream-scheduler communication as notifications
     new e39c940  Send scheduler notifications over a multiprocessing queue
     new e0561a7  Psuedocode for subprocess context manager
     new 48847cd  stream.py: Minimal implementation of event loop function
     new d12e62d  WIP: Add a way to run stream methods in a subprocess
     new be6e68f  Change subprocess.run to start
     new 8b32793  Move subprocess machinery into a method
     new f39f4ef  fixup! Move subprocess machinery into a method
     new d851dec  fixup! fixup! Move subprocess machinery into a method
     new 9b491a9  fixup! fixup! fixup! Move subprocess machinery into a method
     new c2dafd2  Some tweaks
     new 07bb725  Support exception handling from the subprocess
     new dc86301  Add workarounds for queue querying in main process

The 19 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 03/19: _message.py: Use element_name & element_key instead of unique_id

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit df095d5342adb9eccb8d904bb228666a947bb04a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Jul 31 16:31:47 2019 +0100

    _message.py: Use element_name & element_key instead of unique_id
    
    Adding the element full name and display key into all element related
    messages removes the need to look up the plugintable via a plugin
    unique_id just to retrieve the same values for logging and widget
    frontend display. Relying on plugintable state is also incompatible
    if the frontend will be running in a different process, as it will
    exist in multiple states. An Element instance is passed exclusively
    if handling interactive failures.
    
    The element full name is now displayed instead of the unique_id,
    such as in the debugging widget. It is also displayed in place of
    'name' (i.e including any junction prepend) to be more informative.
---
 src/buildstream/_basecache.py                 |   2 +-
 src/buildstream/_cas/cascache.py              |   9 +--
 src/buildstream/_frontend/app.py              |  28 +++----
 src/buildstream/_frontend/widget.py           |  40 +++++-----
 src/buildstream/_loader/loader.py             |   2 +-
 src/buildstream/_message.py                   |   9 ++-
 src/buildstream/_messenger.py                 |  39 +++++-----
 src/buildstream/_pipeline.py                  |   2 +-
 src/buildstream/_project.py                   |   3 +-
 src/buildstream/_scheduler/jobs/elementjob.py |   7 +-
 src/buildstream/_scheduler/jobs/job.py        | 101 ++++++++++++++------------
 src/buildstream/_scheduler/queues/queue.py    |   2 +-
 src/buildstream/_scheduler/scheduler.py       |  16 ++--
 src/buildstream/_state.py                     |  10 +--
 src/buildstream/_stream.py                    |   9 ++-
 src/buildstream/plugin.py                     |  13 +++-
 src/buildstream/sandbox/_sandboxremote.py     |   2 +-
 src/buildstream/sandbox/sandbox.py            |  23 +++---
 src/buildstream/source.py                     |   7 +-
 tests/frontend/logging.py                     |   2 +-
 20 files changed, 171 insertions(+), 155 deletions(-)

diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index 52b777f..0ae64ad 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -244,7 +244,7 @@ class BaseCache():
     def _message(self, message_type, message, **kwargs):
         args = dict(kwargs)
         self.context.messenger.message(
-            Message(None, message_type, message, **args))
+            Message(message_type, message, **args))
 
     # _set_remotes():
     #
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index dbdfa41..9bb354a 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -1248,7 +1248,6 @@ class CASQuota:
                 available = utils._pretty_size(available_space)
 
             self._message(Message(
-                None,
                 MessageType.WARN,
                 "Your system does not have enough available " +
                 "space to support the cache quota specified.",
@@ -1294,7 +1293,7 @@ class CASQuota:
         # Start off with an announcement with as much info as possible
         volume_size, volume_avail = self._get_cache_volume_size()
         self._message(Message(
-            None, MessageType.STATUS, "Starting cache cleanup",
+            MessageType.STATUS, "Starting cache cleanup",
             detail=("Elements required by the current build plan:\n" + "{}\n" +
                     "User specified quota: {} ({})\n" +
                     "Cache usage: {}\n" +
@@ -1310,7 +1309,7 @@ class CASQuota:
         # Do a real computation of the cache size once, just in case
         self.compute_cache_size()
         usage = CASCacheUsage(self)
-        self._message(Message(None, MessageType.STATUS,
+        self._message(Message(MessageType.STATUS,
                               "Cache usage recomputed: {}".format(usage)))
 
         # Collect digests and their remove method
@@ -1330,7 +1329,7 @@ class CASQuota:
             space_saved += size
 
             self._message(Message(
-                None, MessageType.STATUS,
+                MessageType.STATUS,
                 "Freed {: <7} {}".format(
                     utils._pretty_size(size, dec_places=2),
                     ref)))
@@ -1373,7 +1372,7 @@ class CASQuota:
 
         # Informational message about the side effects of the cleanup
         self._message(Message(
-            None, MessageType.INFO, "Cleanup completed",
+            MessageType.INFO, "Cleanup completed",
             detail=("Removed {} refs and saving {} disk space.\n" +
                     "Cache usage is now: {}")
             .format(removed_ref_count,
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index a5588e6..e1b1d8d 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -31,7 +31,6 @@ from .. import Scope
 
 # Import various buildstream internals
 from .._context import Context
-from ..plugin import Plugin
 from .._project import Project
 from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
 from .._message import Message, MessageType, unconditional_messages
@@ -212,7 +211,8 @@ class App():
             self.stream = Stream(self.context, self._session_start,
                                  session_start_callback=self.session_start_cb,
                                  interrupt_callback=self._interrupt_handler,
-                                 ticker_callback=self._tick)
+                                 ticker_callback=self._tick,
+                                 interactive_failure=self._interactive_failures)
 
             self._state = self.stream.get_state()
 
@@ -474,7 +474,7 @@ class App():
     def _message(self, message_type, message, **kwargs):
         args = dict(kwargs)
         self.context.messenger.message(
-            Message(None, message_type, message, **args))
+            Message(message_type, message, **args))
 
     # Exception handler
     #
@@ -559,25 +559,24 @@ class App():
     # Args:
     #    action_name (str): The name of the action being performed,
     #                       same as the task group, if it exists
-    #    full_name (str): The name of this specific task, e.g. the element name
-    #    unique_id (int): If an element job failed, the unique ID of the element.
+    #    full_name (str): The name of this specific task, e.g. the element full name
+    #    element_job (bool): If an element job failed
+    #    element (Element): If an element job failed and interactive failure
+    #                       handling, the Element instance
     #
-    def _job_failed(self, action_name, full_name, unique_id=None):
+    def _job_failed(self, action_name, full_name, element_job=False, element=None):
         # Dont attempt to handle a failure if the user has already opted to
         # terminate
         if not self.stream.terminated:
-            if unique_id:
+            if element_job:
                 # look-up queue
                 for q in self.stream.queues:
                     if q.action_name == action_name:
                         queue = q
                 assert queue, "Job action {} does not have a corresponding queue".format(action_name)
 
-                # look-up element
-                element = Plugin._lookup(unique_id)
-
                 # Get the last failure message for additional context
-                failure = self._fail_messages.get(element._unique_id)
+                failure = self._fail_messages.get(full_name)
 
                 # XXX This is dangerous, sometimes we get the job completed *before*
                 # the failure message reaches us ??
@@ -585,11 +584,12 @@ class App():
                     self._status.clear()
                     click.echo("\n\n\nBUG: Message handling out of sync, " +
                                "unable to retrieve failure message for element {}\n\n\n\n\n"
-                               .format(element), err=True)
+                               .format(full_name), err=True)
                 else:
                     self._handle_failure(element, queue, failure)
 
             else:
+                # Not an element_job, we don't handle the failure
                 click.echo("\nTerminating all jobs\n", err=True)
                 self.stream.terminate()
 
@@ -739,8 +739,8 @@ class App():
             return
 
         # Hold on to the failure messages
-        if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.unique_id is not None:
-            self._fail_messages[message.unique_id] = message
+        if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.element_name is not None:
+            self._fail_messages[message.element_name] = message
 
         # Send to frontend if appropriate
         if is_silenced and (message.message_type not in unconditional_messages):
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index fbde249..31f69a5 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -27,11 +27,10 @@ from ruamel import yaml
 import click
 
 from .profile import Profile
-from .. import Element, Consistency, Scope
+from .. import Consistency, Scope
 from .. import __version__ as bst_version
 from .._exceptions import ImplError
 from .._message import MessageType
-from ..plugin import Plugin
 
 
 # These messages are printed a bit differently
@@ -110,12 +109,12 @@ class WallclockTime(Widget):
 class Debug(Widget):
 
     def render(self, message):
-        unique_id = 0 if message.unique_id is None else message.unique_id
+        element_name = "n/a" if message.element_name is None else message.element_name
 
         text = self.format_profile.fmt('pid:')
         text += self.content_profile.fmt("{: <5}".format(message.pid))
-        text += self.format_profile.fmt(" id:")
-        text += self.content_profile.fmt("{:0>3}".format(unique_id))
+        text += self.format_profile.fmt("element name:")
+        text += self.content_profile.fmt("{: <30}".format(element_name))
 
         return text
 
@@ -181,11 +180,9 @@ class ElementName(Widget):
 
     def render(self, message):
         action_name = message.action_name
-        element_id = message.task_id or message.unique_id
-        if element_id is not None:
-            plugin = Plugin._lookup(element_id)
-            name = plugin._get_full_name()
-            name = '{: <30}'.format(name)
+        element_name = message.element_name
+        if element_name is not None:
+            name = '{: <30}'.format(element_name)
         else:
             name = 'core activity'
             name = '{: <30}'.format(name)
@@ -215,18 +212,16 @@ class CacheKey(Widget):
 
     def render(self, message):
 
-        element_id = message.task_id or message.unique_id
         if not self._key_length:
             return ""
 
-        if element_id is None:
+        if message.element_name is None:
             return ' ' * self._key_length
 
         missing = False
         key = ' ' * self._key_length
-        plugin = Plugin._lookup(element_id)
-        if isinstance(plugin, Element):
-            _, key, missing = plugin._get_display_key()
+        if message.element_key:
+            _, key, missing = message.element_key
 
         if message.message_type in ERROR_MESSAGES:
             text = self._err_profile.fmt(key)
@@ -557,12 +552,12 @@ class LogLine(Widget):
         if self._failure_messages:
             values = OrderedDict()
 
-            for element, messages in sorted(self._failure_messages.items(), key=lambda x: x[0].name):
+            for element_name, messages in sorted(self._failure_messages.items()):
                 for group in self._state.task_groups.values():
                     # Exclude the failure messages if the job didn't ultimately fail
                     # (e.g. succeeded on retry)
-                    if element.name in group.failed_tasks:
-                        values[element.name] = ''.join(self._render(v) for v in messages)
+                    if element_name in group.failed_tasks:
+                        values[element_name] = ''.join(self._render(v) for v in messages)
 
             if values:
                 text += self.content_profile.fmt("Failure Summary\n", bold=True)
@@ -616,10 +611,9 @@ class LogLine(Widget):
     def render(self, message):
 
         # Track logfiles for later use
-        element_id = message.task_id or message.unique_id
-        if message.message_type in ERROR_MESSAGES and element_id is not None:
-            plugin = Plugin._lookup(element_id)
-            self._failure_messages[plugin].append(message)
+        element_name = message.element_name
+        if message.message_type in ERROR_MESSAGES and element_name is not None:
+            self._failure_messages[element_name].append(message)
 
         return self._render(message)
 
@@ -666,7 +660,7 @@ class LogLine(Widget):
         if message.detail:
 
             # Identify frontend messages, we never abbreviate these
-            frontend_message = not (message.task_id or message.unique_id)
+            frontend_message = not message.element_name
 
             # Split and truncate message detail down to message_lines lines
             lines = message.detail.splitlines(True)
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 4b66288..061d28b 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -679,7 +679,7 @@ class Loader():
             if self.project._warning_is_fatal(warning_token):
                 raise LoadError(brief, warning_token)
 
-        message = Message(None, MessageType.WARN, brief)
+        message = Message(MessageType.WARN, brief)
         self._context.messenger.message(message)
 
     # Print warning messages if any of the specified elements have invalid names.
diff --git a/src/buildstream/_message.py b/src/buildstream/_message.py
index 7f1a939..195eba6 100644
--- a/src/buildstream/_message.py
+++ b/src/buildstream/_message.py
@@ -54,8 +54,9 @@ unconditional_messages = [
 #
 class Message():
 
-    def __init__(self, unique_id, message_type, message,
-                 task_id=None,
+    def __init__(self, message_type, message, *,
+                 element_name=None,
+                 element_key=None,
                  detail=None,
                  action_name=None,
                  elapsed=None,
@@ -64,14 +65,14 @@ class Message():
                  scheduler=False):
         self.message_type = message_type  # Message type
         self.message = message            # The message string
+        self.element_name = element_name   # The instance element name of the issuing plugin
+        self.element_key = element_key    # The display key of the issuing plugin element
         self.detail = detail              # An additional detail string
         self.action_name = action_name    # Name of the task queue (fetch, refresh, build, etc)
         self.elapsed = elapsed            # The elapsed time, in timed messages
         self.logfile = logfile            # The log file path where commands took place
         self.sandbox = sandbox            # The error that caused this message used a sandbox
         self.pid = os.getpid()            # The process pid
-        self.unique_id = unique_id        # The plugin object ID issueing the message
-        self.task_id = task_id            # The plugin object ID of the task
         self.scheduler = scheduler        # Whether this is a scheduler level message
         self.creation_time = datetime.datetime.now()
         if message_type in (MessageType.SUCCESS, MessageType.FAIL):
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index d768abf..be5f12c 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,6 @@ from . import _signals
 from . import utils
 from ._exceptions import BstError
 from ._message import Message, MessageType
-from .plugin import Plugin
 
 
 _RENDER_INTERVAL = datetime.timedelta(seconds=1)
@@ -149,16 +148,16 @@ class Messenger():
     #
     # Args:
     #    activity_name (str): The name of the activity
-    #    unique_id (int): Optionally, the unique id of the plugin related to the message
+    #    element_name (str): Optionally, the element full name of the plugin related to the message
     #    detail (str): An optional detailed message, can be multiline output
     #    silent_nested (bool): If True, all but _message.unconditional_messages are silenced
     #
     @contextmanager
-    def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
+    def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False):
         with self._timed_suspendable() as timedata:
             try:
                 # Push activity depth for status messages
-                message = Message(unique_id, MessageType.START, activity_name, detail=detail)
+                message = Message(MessageType.START, activity_name, detail=detail, element_name=element_name)
                 self.message(message)
                 with self.silence(actually_silence=silent_nested):
                     yield
@@ -167,12 +166,12 @@ class Messenger():
                 # Note the failure in status messages and reraise, the scheduler
                 # expects an error when there is an error.
                 elapsed = datetime.datetime.now() - timedata.start_time
-                message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
+                message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name)
                 self.message(message)
                 raise
 
             elapsed = datetime.datetime.now() - timedata.start_time
-            message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
+            message = Message(MessageType.SUCCESS, activity_name, elapsed=elapsed, element_name=element_name)
             self.message(message)
 
     # simple_task()
@@ -181,7 +180,7 @@ class Messenger():
     #
     # Args:
     #    activity_name (str): The name of the activity
-    #    unique_id (int): Optionally, the unique id of the plugin related to the message
+    #    element_name (str): Optionally, the element full name of the plugin related to the message
     #    full_name (str): Optionally, the distinguishing name of the activity, e.g. element name
     #    silent_nested (bool): If True, all but _message.unconditional_messages are silenced
     #
@@ -189,10 +188,10 @@ class Messenger():
     #    Task: A Task object that represents this activity, principally used to report progress
     #
     @contextmanager
-    def simple_task(self, activity_name, *, unique_id=None, full_name=None, silent_nested=False):
+    def simple_task(self, activity_name, *, element_name=None, full_name=None, silent_nested=False):
         # Bypass use of State when none exists (e.g. tests)
         if not self._state:
-            with self.timed_activity(activity_name, unique_id=unique_id, silent_nested=silent_nested):
+            with self.timed_activity(activity_name, element_name=element_name, silent_nested=silent_nested):
                 yield
             return
 
@@ -201,7 +200,7 @@ class Messenger():
 
         with self._timed_suspendable() as timedata:
             try:
-                message = Message(unique_id, MessageType.START, activity_name)
+                message = Message(MessageType.START, activity_name, element_name=element_name)
                 self.message(message)
 
                 task = self._state.add_task(activity_name, full_name)
@@ -215,7 +214,7 @@ class Messenger():
 
             except BstError:
                 elapsed = datetime.datetime.now() - timedata.start_time
-                message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
+                message = Message(MessageType.FAIL, activity_name, elapsed=elapsed, element_name=element_name)
                 self.message(message)
                 raise
             finally:
@@ -232,7 +231,8 @@ class Messenger():
                     detail = "{} subtasks processed".format(task.current_progress)
             else:
                 detail = None
-            message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed, detail=detail)
+            message = Message(MessageType.SUCCESS, activity_name, elapsed=elapsed, detail=detail,
+                              element_name=element_name)
             self.message(message)
 
     # recorded_messages()
@@ -336,13 +336,12 @@ class Messenger():
         EMPTYTIME = "--:--:--"
         template = "[{timecode: <8}] {type: <7}"
 
-        # If this message is associated with a plugin, print what
-        # we know about the plugin.
-        plugin_name = ""
-        if message.unique_id:
-            template += " {plugin}"
-            plugin = Plugin._lookup(message.unique_id)
-            plugin_name = plugin.name
+        # If this message is associated with an element or source plugin, print the
+        # full element name of the instance.
+        element_name = ""
+        if message.element_name:
+            template += " {element_name}"
+            element_name = message.element_name
 
         template += ": {message}"
 
@@ -359,7 +358,7 @@ class Messenger():
             timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
 
         text = template.format(timecode=timecode,
-                               plugin=plugin_name,
+                               element_name=element_name,
                                type=message.message_type.upper(),
                                message=message.message,
                                detail=detail)
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index af60ffd..b6896b3 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -485,7 +485,7 @@ class Pipeline():
     def _message(self, message_type, message, **kwargs):
         args = dict(kwargs)
         self._context.messenger.message(
-            Message(None, message_type, message, **args))
+            Message(message_type, message, **args))
 
 
 # _Planner()
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 9428ab4..96635f3 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -459,7 +459,7 @@ class Project():
             ]
             detail += "\n".join(lines)
             self._context.messenger.message(
-                Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
+                Message(MessageType.WARN, "Ignoring redundant source references", detail=detail))
 
         return elements
 
@@ -685,7 +685,6 @@ class Project():
         if not fail_on_overlap.is_none():
             self._context.messenger.message(
                 Message(
-                    None,
                     MessageType.WARN,
                     "Use of fail-on-overlap within project.conf " +
                     "is deprecated. Consider using fatal-warnings instead."
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 1384486..6bcb9de 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -68,14 +68,15 @@ class ElementJob(Job):
     def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
         super().__init__(*args, **kwargs)
         self.set_name(element._get_full_name())
+        self.element_job = True
         self.queue = queue
         self._element = element
         self._action_cb = action_cb            # The action callable function
         self._complete_cb = complete_cb        # The complete callable function
 
-        # Set the ID for logging purposes
-        self.set_message_unique_id(element._unique_id)
-        self.set_task_id(element._unique_id)
+        # Set the plugin element name & key for logging purposes
+        self.set_message_element_name(self.name)
+        self.set_message_element_key(self._element._get_display_key())
 
     @property
     def element(self):
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 2e6c7bb..d80d1a9 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -131,6 +131,7 @@ class Job():
         self.name = None                 # The name of the job, set by the job's subclass
         self.action_name = action_name   # The action name for the Queue
         self.child_data = None           # Data to be sent to the main process
+        self.element_job = False         # If the job is an ElementJob
 
         #
         # Private members
@@ -147,8 +148,8 @@ class Job():
         self._terminated = False               # Whether this job has been explicitly terminated
 
         self._logfile = logfile
-        self._message_unique_id = None
-        self._task_id = None
+        self._message_element_name = None      # The plugin instance element name for messaging
+        self._message_element_key = None      # The element key for messaging
 
     # set_name()
     #
@@ -174,8 +175,8 @@ class Job():
             self._logfile,
             self._max_retries,
             self._tries,
-            self._message_unique_id,
-            self._task_id,
+            self._message_element_name,
+            self._message_element_key
         )
 
         # Make sure that picklability doesn't break, by exercising it during
@@ -311,36 +312,27 @@ class Job():
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
 
-    # set_message_unique_id()
+    # set_message_element_name()
     #
-    # This is called by Job subclasses to set the plugin ID
-    # issuing the message (if an element is related to the Job).
+    # This is called by Job subclasses to set the plugin instance element
+    # name issuing the message (if an element is related to the Job).
     #
     # Args:
-    #     unique_id (int): The id to be supplied to the Message() constructor
+    #     element_name (int): The element_name to be supplied to the Message() constructor
     #
-    def set_message_unique_id(self, unique_id):
-        self._message_unique_id = unique_id
+    def set_message_element_name(self, element_name):
+        self._message_element_name = element_name
 
-    # set_task_id()
+    # set_message_element_key()
     #
-    # This is called by Job subclasses to set a plugin ID
-    # associated with the task at large (if any element is related
-    # to the task).
-    #
-    # This will only be used in the child process running the task.
-    #
-    # The task ID helps keep messages in the frontend coherent
-    # in the case that multiple plugins log in the context of
-    # a single task (e.g. running integration commands should appear
-    # in the frontend for the element being built, not the element
-    # running the integration commands).
+    # This is called by Job subclasses to set the element
+    # key for for the issuing message (if an element is related to the Job).
     #
     # Args:
-    #     task_id (int): The plugin identifier for this task
+    #     element_key (tuple): The element_key tuple to be supplied to the Message() constructor
     #
-    def set_task_id(self, task_id):
-        self._task_id = task_id
+    def set_message_element_key(self, element_key):
+        self._message_element_key = element_key
 
     # message():
     #
@@ -351,16 +343,18 @@ class Job():
     #    message_type (MessageType): The type of message to send
     #    message (str): The message
     #    kwargs: Remaining Message() constructor arguments, note that you can
-    #            override 'unique_id' this way.
+    #            override 'element_name' and 'element_key' this way.
     #
-    def message(self, message_type, message, **kwargs):
+    def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
+        kwargs['scheduler'] = True
         kwargs['scheduler'] = True
-        unique_id = self._message_unique_id
-        if "unique_id" in kwargs:
-            unique_id = kwargs["unique_id"]
-            del kwargs["unique_id"]
+        # If default name & key values not provided, set as given job attributes
+        if element_name is None:
+            element_name = self._message_element_name
+        if element_key is None:
+            element_key = self._message_element_key
         self._scheduler.context.messenger.message(
-            Message(unique_id, message_type, message, **kwargs))
+            Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs))
 
     #######################################################
     #                  Abstract Methods                   #
@@ -573,13 +567,16 @@ class Job():
 #                   that should be used - should contain {pid}.
 #    max_retries (int): The maximum number of retries.
 #    tries (int): The number of retries so far.
-#    message_unique_id (int): None, or the id to be supplied to the Message() constructor.
-#    task_id (int): None, or the plugin identifier for this job.
+#    message_element_name (str): None, or the plugin instance element name
+#                                to be supplied to the Message() constructor.
+#    message_element_key (tuple): None, or the element display key tuple
+#                                to be supplied to the Message() constructor.
 #
 class ChildJob():
 
     def __init__(
-            self, action_name, messenger, logdir, logfile, max_retries, tries, message_unique_id, task_id):
+            self, action_name, messenger, logdir, logfile, max_retries, tries,
+            message_element_name, message_element_key):
 
         self.action_name = action_name
 
@@ -588,8 +585,8 @@ class ChildJob():
         self._logfile = logfile
         self._max_retries = max_retries
         self._tries = tries
-        self._message_unique_id = message_unique_id
-        self._task_id = task_id
+        self._message_element_name = message_element_name
+        self._message_element_key = message_element_key
 
         self._queue = None
 
@@ -601,17 +598,20 @@ class ChildJob():
     # Args:
     #    message_type (MessageType): The type of message to send
     #    message (str): The message
-    #    kwargs: Remaining Message() constructor arguments, note that you can
-    #            override 'unique_id' this way.
+    #    kwargs: Remaining Message() constructor arguments, note
+    #            element_key is set in _child_message_handler
+    #            for front end display if not already set or explicitly
+    #            overriden here.
     #
-    def message(self, message_type, message, **kwargs):
+    def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
         kwargs['scheduler'] = True
-        unique_id = self._message_unique_id
-        if "unique_id" in kwargs:
-            unique_id = kwargs["unique_id"]
-            del kwargs["unique_id"]
-        self._messenger.message(
-            Message(unique_id, message_type, message, **kwargs))
+        # If default name & key values not provided, set as given job attributes
+        if element_name is None:
+            element_name = self._message_element_name
+        if element_key is None:
+            element_key = self._message_element_key
+        self._messenger.message(Message(message_type, message, element_name=element_name,
+                                        element_key=element_key, **kwargs))
 
     # send_message()
     #
@@ -844,6 +844,8 @@ class ChildJob():
     # frontend's main message handler in the context of a child task
     # and performs local logging to the local log file before sending
     # the message back to the parent process for further propagation.
+    # The related element display key is added to the message for
+    # widget rendering if not already set for an element childjob.
     #
     # Args:
     #    message     (Message): The message to log
@@ -852,7 +854,12 @@ class ChildJob():
     def _child_message_handler(self, message, is_silenced):
 
         message.action_name = self.action_name
-        message.task_id = self._task_id
+
+        # If no key has been set at this point, and the element job has
+        # a related key, set it. This is needed for messages going
+        # straight to the message handler from the child process.
+        if message.element_key is None and self._message_element_key:
+            message.element_key = self._message_element_key
 
         # Send to frontend if appropriate
         if is_silenced and (message.message_type not in unconditional_messages):
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 79f1fa4..8c81ce2 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -341,7 +341,7 @@ class Queue():
     # a message for the element they are processing
     def _message(self, element, message_type, brief, **kwargs):
         context = element._get_context()
-        message = Message(element._unique_id, message_type, brief, **kwargs)
+        message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
         context.messenger.message(message)
 
     def _element_log_path(self, element):
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2dea1d4..4f668c6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
 
 # Local imports
 from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob
 from .._profile import Topics, PROFILER
 
 
@@ -64,13 +64,15 @@ _ACTION_NAME_CACHE_SIZE = 'size'
 #    state: The state that can be made available to the frontend
 #    interrupt_callback: A callback to handle ^C
 #    ticker_callback: A callback call once per second
+#    interactive_failure: If the session is set to handle interactive failures
 #
 class Scheduler():
 
     def __init__(self, context,
                  start_time, state,
                  interrupt_callback=None,
-                 ticker_callback=None):
+                 ticker_callback=None,
+                 interactive_failure=False):
 
         #
         # Public members
@@ -92,6 +94,7 @@ class Scheduler():
         self._suspendtime = None              # Session time compensation for suspended state
         self._queue_jobs = True               # Whether we should continue to queue jobs
         self._state = state
+        self._interactive_failure = interactive_failure  # If the session is set to handle interactive failures
 
         # State of cache management related jobs
         self._cache_size_scheduled = False    # Whether we have a cache size job scheduled
@@ -253,10 +256,11 @@ class Scheduler():
 
         self._state.remove_task(job.action_name, job.name)
         if status == JobStatus.FAIL:
-            unique_id = None
-            if isinstance(job, ElementJob):
-                unique_id = job._element._unique_id
-            self._state.fail_task(job.action_name, job.name, unique_id)
+            # If it's an elementjob, we want to compare against the failure messages
+            # and send the Element() instance if interactive failure handling. Note
+            # this may change if the frontend is run in a separate process for pickling
+            element = job._element if (job.element_job and self._interactive_failure) else None
+            self._state.fail_task(job.action_name, job.name, element_job=job.element_job, element=element)
 
         # Now check for more jobs
         self._sched()
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 388ed81..2169467 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -208,8 +208,7 @@ class State():
     #    full_name (str): The full name of the task, distinguishing
     #                     it from other tasks with the same action name
     #                     e.g. an element's name.
-    #    unique_id (int): (optionally) the element's unique ID, if the failure
-    #                     came from an element
+    #    element_job (bool): (optionally) If an element job failed.
     #
     def register_task_failed_callback(self, callback):
         self._task_failed_cbs.append(callback)
@@ -324,11 +323,12 @@ class State():
     #    full_name (str): The full name of the task, distinguishing
     #                     it from other tasks with the same action name
     #                     e.g. an element's name.
-    #    unique_id (int): (optionally) the element's unique ID, if the failure came from an element
+    #    element_job (bool): (optionally) If an element job failed.
+    #    element (Element): (optionally) The element instance if interactive handling
     #
-    def fail_task(self, action_name, full_name, unique_id=None):
+    def fail_task(self, action_name, full_name, element_job=False, element=None):
         for cb in self._task_failed_cbs:
-            cb(action_name, full_name, unique_id)
+            cb(action_name, full_name, element_job, element)
 
 
 # _Task
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index cbd635a..5f12889 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -55,13 +55,15 @@ from . import Scope, Consistency
 #    session_start_callback (callable): A callback to invoke when the session starts
 #    interrupt_callback (callable): A callback to invoke when we get interrupted
 #    ticker_callback (callable): Invoked every second while running the scheduler
+#    interactive_failure: If the session is set to handle interactive failures
 #
 class Stream():
 
     def __init__(self, context, session_start, *,
                  session_start_callback=None,
                  interrupt_callback=None,
-                 ticker_callback=None):
+                 ticker_callback=None,
+                 interactive_failure=False):
 
         #
         # Public members
@@ -85,7 +87,8 @@ class Stream():
 
         self._scheduler = Scheduler(context, session_start, self._state,
                                     interrupt_callback=interrupt_callback,
-                                    ticker_callback=ticker_callback)
+                                    ticker_callback=ticker_callback,
+                                    interactive_failure=interactive_failure)
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
 
@@ -1235,7 +1238,7 @@ class Stream():
     def _message(self, message_type, message, **kwargs):
         args = dict(kwargs)
         self._context.messenger.message(
-            Message(None, message_type, message, **args))
+            Message(message_type, message, **args))
 
     # _add_queue()
     #
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 506eba5..35e80dd 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -428,6 +428,9 @@ class Plugin():
 
         Note: Informative messages tell the user something they might want
               to know, like if refreshing an element caused it to change.
+              The instance full name of the plugin will be generated with the
+              message, this being the name of the given element, as appose to
+              the class name of the underlying plugin __kind identifier.
         """
         self.__message(MessageType.INFO, brief, detail=detail)
 
@@ -491,7 +494,7 @@ class Plugin():
               self.call(... command which takes time ...)
         """
         with self.__context.messenger.timed_activity(activity_name,
-                                                     unique_id=self._unique_id,
+                                                     element_name=self._get_full_name(),
                                                      detail=detail,
                                                      silent_nested=silent_nested):
             yield
@@ -733,7 +736,7 @@ class Plugin():
         return (exit_code, output)
 
     def __message(self, message_type, brief, **kwargs):
-        message = Message(self._unique_id, message_type, brief, **kwargs)
+        message = Message(message_type, brief, element_name=self._get_full_name(), **kwargs)
         self.__context.messenger.message(message)
 
     def __note_command(self, output, *popenargs, **kwargs):
@@ -761,10 +764,12 @@ class Plugin():
 
     def __set_full_name(self):
         project = self.__project
+        # Set the name, depending on element or source plugin type
+        name = self._element_name if self.__type_tag == "source" else self.name  # pylint: disable=no-member
         if project.junction:
-            return '{}:{}'.format(project.junction.name, self.name)
+            return '{}:{}'.format(project.junction.name, name)
         else:
-            return self.name
+            return name
 
 
 # A local table for _prefix_warning()
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index c84bfa4..bbd23fc 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -107,7 +107,7 @@ class SandboxRemote(Sandbox):
         self.operation_name = None
 
     def info(self, msg):
-        self._get_context().messenger.message(Message(None, MessageType.INFO, msg))
+        self._get_context().messenger.message(Message(MessageType.INFO, msg))
 
     @staticmethod
     def specs_from_config_node(config_node, basedir=None):
diff --git a/src/buildstream/sandbox/sandbox.py b/src/buildstream/sandbox/sandbox.py
index ece15c9..ac9d672 100644
--- a/src/buildstream/sandbox/sandbox.py
+++ b/src/buildstream/sandbox/sandbox.py
@@ -120,12 +120,12 @@ class Sandbox():
         self.__allow_real_directory = kwargs['allow_real_directory']
         self.__allow_run = True
 
-        # Plugin ID for logging
+        # Plugin element full name for logging
         plugin = kwargs.get('plugin', None)
         if plugin:
-            self.__plugin_id = plugin._unique_id
+            self.__element_name = plugin._get_full_name()
         else:
-            self.__plugin_id = None
+            self.__element_name = None
 
         # Configuration from kwargs common to all subclasses
         self.__config = kwargs['config']
@@ -563,12 +563,12 @@ class Sandbox():
 
         return False
 
-    # _get_plugin_id()
+    # _get_element_name()
     #
-    # Get the plugin's unique identifier
+    # Get the plugin's element full name
     #
-    def _get_plugin_id(self):
-        return self.__plugin_id
+    def _get_element_name(self):
+        return self.__element_name
 
     # _callback()
     #
@@ -622,8 +622,7 @@ class Sandbox():
     #    details (str): optional, more detatils
     def _issue_warning(self, message, detail=None):
         self.__context.messenger.message(
-            Message(None,
-                    MessageType.WARN,
+            Message(MessageType.WARN,
                     message,
                     detail=detail
                     )
@@ -649,7 +648,7 @@ class _SandboxBatch():
     def execute_group(self, group):
         if group.label:
             context = self.sandbox._get_context()
-            cm = context.messenger.timed_activity(group.label, unique_id=self.sandbox._get_plugin_id())
+            cm = context.messenger.timed_activity(group.label, element_name=self.sandbox._get_element_name())
         else:
             cm = contextlib.suppress()
 
@@ -659,8 +658,8 @@ class _SandboxBatch():
     def execute_command(self, command):
         if command.label:
             context = self.sandbox._get_context()
-            message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS,
-                              'Running command', detail=command.label)
+            message = Message(MessageType.STATUS, 'Running command', detail=command.label,
+                              element_name=self.sandbox._get_element_name())
             context.messenger.message(message)
 
         exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env)
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index b513fdb..c5fb614 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -308,10 +308,11 @@ class Source(Plugin):
 
     def __init__(self, context, project, meta, *, alias_override=None, unique_id=None):
         provenance = meta.config.get_provenance()
+        # Set element_name member before parent init, as needed for debug messaging
+        self.__element_name = meta.element_name         # The name of the element owning this source
         super().__init__("{}-{}".format(meta.element_name, meta.element_index),
                          context, project, provenance, "source", unique_id=unique_id)
 
-        self.__element_name = meta.element_name         # The name of the element owning this source
         self.__element_index = meta.element_index       # The index of the source in the owning element's source list
         self.__element_kind = meta.element_kind         # The kind of the element owning this source
         self.__directory = meta.directory               # Staging relative directory
@@ -1076,6 +1077,10 @@ class Source(Plugin):
         length = min(len(key), context.log_key_length)
         return key[:length]
 
+    @property
+    def _element_name(self):
+        return self.__element_name
+
     # _get_args_for_child_job_pickling(self)
     #
     # Return data necessary to reconstruct this object in a child job process.
diff --git a/tests/frontend/logging.py b/tests/frontend/logging.py
index 6a17bf7..462af82 100644
--- a/tests/frontend/logging.py
+++ b/tests/frontend/logging.py
@@ -124,7 +124,7 @@ def test_failed_build_listing(cli, datafiles):
         assert m.end() in failure_summary_range
     assert len(matches) == 3  # each element should be matched once.
 
-    # Note that if we mess up the 'unique_id' of Messages, they won't be printed
+    # Note that if we mess up the 'element_name' of Messages, they won't be printed
     # with the name of the relevant element, e.g. 'testfail-1.bst'. Check that
     # they have the name as expected.
     pattern = r"\[..:..:..\] FAILURE testfail-.\.bst: Staged artifacts do not provide command 'sh'"


[buildstream] 13/19: Move subprocess machinery into a method

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8b32793fa8ce95273ee034829166e0697363d00e
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 11:26:04 2019 +0100

    Move subprocess machinery into a method
    
    This get's round the pickling identity issues
---
 src/buildstream/_stream.py | 128 ++++++++++++++++++++++++++++++---------------
 1 file changed, 87 insertions(+), 41 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0fe2234..e4f6bc0 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,7 +19,6 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
-import asyncio
 import itertools
 import functools
 import multiprocessing as mp
@@ -48,38 +47,44 @@ from . import utils, _yaml, _site
 from . import Scope, Consistency
 
 
-# A decorator which runs the decorated method to be run in a subprocess
-def subprocessed(func):
-
-    @functools.wraps(func)
-    def _subprocessed(self, *args, **kwargs):
-        assert self
-        print("Args: {}".format([*args]))
-        print("Kwargs: {}".format(list(kwargs.items())))
-        assert not self._subprocess
-
-        # TODO use functools to pass arguments to func to make target for subprocess
-
-        # Start subprocessed work
-        mp_context = mp.get_context(method='spawn')
-        process_name = "stream-{}".format(func.__name__)
-        target = functools.partial(func, self, *args, **kwargs)
-        print("launching subprocess:", process_name)
-        self._subprocess = mp_context.Process(target=target, name=process_name)
-        self._subprocess.start()
-
-        # TODO connect signal handlers
-
-        # Run event loop. This event loop should exit once the
-        # subprocessed work has completed
-        print("Starting loop...")
-        while not self._subprocess.exitcode:
-            self._loop()
-        print("Stopping loop...")
-
-        # Return result of subprocessed function
-
-    return _subprocessed
+def _subprocessed(self, *args, **kwargs):
+    assert self
+    print("Args: {}".format([*args]))
+    print("Kwargs: {}".format(list(kwargs.items())))
+    assert not self._subprocess
+
+    global notification_count 
+    notification_count = 0
+    # TODO use functools to pass arguments to func to make target for subprocess
+
+    # Start subprocessed work
+    mp_context = mp.get_context(method='spawn')
+    process_name = "stream-{}".format(func.__name__)
+    print("launchinglaunching subprocess:", process_name)
+    print(func.__module__, func.__name__)
+    import buildstream
+    try:
+        assert func is buildstream._stream.Stream.build or func is Stream.build
+    except AssertionError:
+        print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+    self._subprocess.start()
+
+    # TODO connect signal handlers
+
+    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+    print("Starting loop...")
+    while not self._subprocess.exitcode:
+        self._loop()
+    print("Stopping loop...")
+
+    try:
+        while True:
+            notification = self.notification_queue.get()
+            self._scheduler_notification_handler(notification)
+    except queue.Empty:
+        pass
 
 
 # Stream()
@@ -141,6 +146,46 @@ class Stream():
     def init(self):
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
+        print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
+
+
+    def run_in_subprocess(self, func, *args, **kwargs):
+        print("Args: {}".format([*args]))
+        print("Kwargs: {}".format(list(kwargs.items())))
+        assert not self._subprocess
+
+        global notification_count 
+        notification_count = 0
+        # TODO use functools to pass arguments to func to make target for subprocess
+
+        # Start subprocessed work
+        mp_context = mp.get_context(method='fork')
+        process_name = "stream-{}".format(func.__name__)
+        print("launchinglaunching subprocess:", process_name)
+        print(func.__module__, func.__name__)
+        import buildstream
+        try:
+            assert func is buildstream._stream.Stream.build or func is Stream.build
+        except AssertionError:
+            print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+        self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+        self._subprocess.start()
+
+        # TODO connect signal handlers
+
+        self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+        print("Starting loop...")
+        while not self._subprocess.exitcode:
+            self._loop()
+        print("Stopping loop...")
+
+        try:
+            while True:
+                notification = self.notification_queue.get()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            pass
 
     # cleanup()
     #
@@ -265,6 +310,8 @@ class Stream():
         return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
                               usebuildtree=buildtree)
 
+    def build(self, *args, **kwargs):
+        self.run_in_subprocess(self._build, *args, **kwargs)
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -282,14 +329,13 @@ class Stream():
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
-    @subprocessed
-    def build(self, targets, *,
-              track_targets=None,
-              track_except=None,
-              track_cross_junctions=False,
-              ignore_junction_targets=False,
-              build_all=False,
-              remote=None):
+    def _build(self, targets, *,
+               track_targets=None,
+               track_except=None,
+               track_cross_junctions=False,
+               ignore_junction_targets=False,
+               build_all=False,
+               remote=None):
 
         if build_all:
             selection = PipelineSelection.ALL


[buildstream] 14/19: fixup! Move subprocess machinery into a method

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f39f4ef0e541f50911fb695e7848a2e6482f2011
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 14:21:30 2019 +0100

    fixup! Move subprocess machinery into a method
---
 src/buildstream/_stream.py | 94 +++++++++++-----------------------------------
 1 file changed, 22 insertions(+), 72 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e4f6bc0..1de8364 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -47,46 +47,6 @@ from . import utils, _yaml, _site
 from . import Scope, Consistency
 
 
-def _subprocessed(self, *args, **kwargs):
-    assert self
-    print("Args: {}".format([*args]))
-    print("Kwargs: {}".format(list(kwargs.items())))
-    assert not self._subprocess
-
-    global notification_count 
-    notification_count = 0
-    # TODO use functools to pass arguments to func to make target for subprocess
-
-    # Start subprocessed work
-    mp_context = mp.get_context(method='spawn')
-    process_name = "stream-{}".format(func.__name__)
-    print("launchinglaunching subprocess:", process_name)
-    print(func.__module__, func.__name__)
-    import buildstream
-    try:
-        assert func is buildstream._stream.Stream.build or func is Stream.build
-    except AssertionError:
-        print(func, func.__qualname__, func.__name__, func.__module__, id(func))
-    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-    self._subprocess.start()
-
-    # TODO connect signal handlers
-
-    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
-    print("Starting loop...")
-    while not self._subprocess.exitcode:
-        self._loop()
-    print("Stopping loop...")
-
-    try:
-        while True:
-            notification = self.notification_queue.get()
-            self._scheduler_notification_handler(notification)
-    except queue.Empty:
-        pass
-
-
 # Stream()
 #
 # This is the main, toplevel calling interface in BuildStream core.
@@ -146,15 +106,15 @@ class Stream():
     def init(self):
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
-        print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
-
+        print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__,
+              id(Stream.build))
 
     def run_in_subprocess(self, func, *args, **kwargs):
         print("Args: {}".format([*args]))
         print("Kwargs: {}".format(list(kwargs.items())))
         assert not self._subprocess
 
-        global notification_count 
+        global notification_count
         notification_count = 0
         # TODO use functools to pass arguments to func to make target for subprocess
 
@@ -162,30 +122,22 @@ class Stream():
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
         print("launchinglaunching subprocess:", process_name)
-        print(func.__module__, func.__name__)
-        import buildstream
-        try:
-            assert func is buildstream._stream.Stream.build or func is Stream.build
-        except AssertionError:
-            print(func, func.__qualname__, func.__name__, func.__module__, id(func))
         self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
         self._subprocess.start()
 
         # TODO connect signal handlers
-
-        self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
-        print("Starting loop...")
-        while not self._subprocess.exitcode:
+        while self._subprocess.exitcode is not None:
+            self._subprocess.join(0.1)
             self._loop()
         print("Stopping loop...")
 
-        try:
-            while True:
-                notification = self.notification_queue.get()
-                self._scheduler_notification_handler(notification)
-        except queue.Empty:
-            pass
+        # try:
+        #     while True:
+        #         notification = self._notification_queue.get_nowait()
+        #         self._scheduler_notification_handler(notification)
+        # except queue.Empty:
+        #     print("Finished processing notifications")
+        #     pass
 
     # cleanup()
     #
@@ -312,6 +264,7 @@ class Stream():
 
     def build(self, *args, **kwargs):
         self.run_in_subprocess(self._build, *args, **kwargs)
+
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -1701,15 +1654,12 @@ class Stream():
     # work to a subprocess with the @subprocessed decorator
     def _loop(self):
         assert self._notification_queue
-
-        # Check for new messages
-        try:
-            notification = self._notification_queue.get(block=True, timeout=0.1)
-        except queue.Empty:
-            notification = None
-            print("queue empty, continuing...")
-
-        # Process new messages
-        if notification:
-            print("handling notifications")
-            self._scheduler_notification_handler(notification)
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notification_queue.get_nowait()
+                print("handling notifications")
+                self._scheduler_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break


[buildstream] 04/19: _message.py: Use element_name & element_key instead of unique_id

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit b9bcb00b1719396c40b75b98d9c50de83ddbf42a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Jul 31 16:31:47 2019 +0100

    _message.py: Use element_name & element_key instead of unique_id
    
    Adding the element full name and display key into all element related
    messages removes the need to look up the plugintable via a plugin
    unique_id just to retrieve the same values for logging and widget
    frontend display. Relying on plugintable state is also incompatible
    if the frontend will be running in a different process, as it will
    exist in multiple states. An Element instance is passed exclusively
    if handling interactive failures.
    
    The element full name is now displayed instead of the unique_id,
    such as in the debugging widget. It is also displayed in place of
    'name' (i.e including any junction prepend) to be more informative.
---
 src/buildstream/_frontend/app.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index e1b1d8d..76e3bc7 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -586,6 +586,7 @@ class App():
                                "unable to retrieve failure message for element {}\n\n\n\n\n"
                                .format(full_name), err=True)
                 else:
+                    # Note element will be given if in interactive failure mode
                     self._handle_failure(element, queue, failure)
 
             else:


[buildstream] 09/19: Psuedocode for subprocess context manager

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e0561a73703dfbb0ec6c82bbdafc63ccea2997ad
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 11:12:30 2019 +0100

    Psuedocode for subprocess context manager
---
 src/buildstream/_stream.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2d451c6..4a1edcf 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1609,3 +1609,17 @@ class Stream():
         # a new use-case arises.
         #
         raise TypeError("Stream objects should not be pickled.")
+
+    # TODO
+    # Causes the decorated method to be run in a subprocess
+    @contextmanager
+    def subprocessed(self, func, *args, **kwargs):
+        pass
+        # Set up event loop
+
+        # Start subprocessed work
+
+        # Run event loop. This event loop should exit once the
+        # subprocessed work has completed
+
+        # Return result of subprocessed function


[buildstream] 07/19: Refer to stream-scheduler communication as notifications

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ed0eabe3ccac556f5ebe3783cb8538bba5ab0f16
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 13:02:15 2019 +0100

    Refer to stream-scheduler communication as notifications
---
 src/buildstream/_scheduler/scheduler.py | 38 ++++++++++++++++-----------------
 1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 5bf0764..9e120e4 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -98,7 +98,7 @@ class Notification:
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state, message_handler,
+                 start_time, state, notification_handler,
                  interrupt_callback=None,
                  ticker_callback=None,
                  interactive_failure=False):
@@ -131,8 +131,8 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callback to send messages to report back to the Scheduler's owner
-        self.message = message_handler
+        # Callback to send notifications to report back to the Scheduler's owner
+        self.notify = notification_handler
 
         # Whether our exclusive jobs, like 'cleanup' are currently already
         # waiting or active.
@@ -299,13 +299,13 @@ class Scheduler():
             # this may change if the frontend is run in a separate process for pickling
             element = job._element if (job.element_job and self._interactive_failure) else None
 
-            message = Notification(NotificationType.JOB_COMPLETE,
-                                   full_name=job.name,
-                                   job_action=job.action_name,
-                                   job_status=status,
-                                   failed_element=job.element_job,
-                                   element=element)
-            self.message(message)
+        notification = Notification(NotificationType.JOB_COMPLETE,
+                                    full_name=job.name,
+                                    job_action=job.action_name,
+                                    job_status=status,
+                                    failed_element=job.element_job,
+                                    element=element)
+        self.notify(notification)
 
         if process_jobs:
             # Now check for more jobs
@@ -365,11 +365,11 @@ class Scheduler():
     #
     def _start_job(self, job):
         self._active_jobs.append(job)
-        message = Notification(NotificationType.JOB_START,
-                               full_name=job.name,
-                               job_action=job.action_name,
-                               elapsed_time=self.elapsed_time())
-        self.message(message)
+        notification = Notification(NotificationType.JOB_START,
+                                    full_name=job.name,
+                                    job_action=job.action_name,
+                                    elapsed_time=self.elapsed_time())
+        self.notify(notification)
         job.start()
 
     # Callback for the cache size job
@@ -588,8 +588,8 @@ class Scheduler():
         if self.terminated:
             return
 
-        message = Notification(NotificationType.INTERRUPT)
-        self.message(message)
+        notification = Notification(NotificationType.INTERRUPT)
+        self.notify(notification)
 
     # _terminate_event():
     #
@@ -648,8 +648,8 @@ class Scheduler():
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        message = Notification(NotificationType.TICK)
-        self.message(message)
+        notification = Notification(NotificationType.TICK)
+        self.notify(notification)
         self.loop.call_later(1, self._tick)
 
     def __getstate__(self):


[buildstream] 11/19: WIP: Add a way to run stream methods in a subprocess

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d12e62d7aeb8595133b0c732a4b0038ae2479a24
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jul 4 09:55:51 2019 +0100

    WIP: Add a way to run stream methods in a subprocess
    
    Currently the frontend is getting stuck in an infite loop.
    
    Also contains debugging print statements which will need to be
    stripped out.
---
 src/buildstream/_frontend/app.py        | 153 ++++++++++++++++----------------
 src/buildstream/_scheduler/scheduler.py |   6 +-
 src/buildstream/_stream.py              |  67 ++++++++++----
 3 files changed, 129 insertions(+), 97 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 9f90938..90070af 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -596,84 +596,83 @@ class App():
 
     def _handle_failure(self, element, queue, failure):
 
-        # Handle non interactive mode setting of what to do when a job fails.
-        if not self._interactive_failures:
+        # # Handle non interactive mode setting of what to do when a job fails.
+        # if not self._interactive_failures:
 
-            if self.context.sched_error_action == _SchedulerErrorAction.TERMINATE:
-                self.stream.terminate()
-            elif self.context.sched_error_action == _SchedulerErrorAction.QUIT:
-                self.stream.quit()
-            elif self.context.sched_error_action == _SchedulerErrorAction.CONTINUE:
-                pass
-            return
-
-        assert False
-        # Interactive mode for element failures
-        with self._interrupted():
-
-            summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
-                       "\n" +
-                       "Choose one of the following options:\n" +
-                       "  (c)ontinue  - Continue queueing jobs as much as possible\n" +
-                       "  (q)uit      - Exit after all ongoing jobs complete\n" +
-                       "  (t)erminate - Terminate any ongoing jobs and exit\n" +
-                       "  (r)etry     - Retry this job\n")
-            if failure.logfile:
-                summary += "  (l)og       - View the full log file\n"
-            if failure.sandbox:
-                summary += "  (s)hell     - Drop into a shell in the failed build sandbox\n"
-            summary += "\nPressing ^C will terminate jobs and exit\n"
-
-            choices = ['continue', 'quit', 'terminate', 'retry']
-            if failure.logfile:
-                choices += ['log']
-            if failure.sandbox:
-                choices += ['shell']
-
-            choice = ''
-            while choice not in ['continue', 'quit', 'terminate', 'retry']:
-                click.echo(summary, err=True)
-
-                self._notify("BuildStream failure", "{} on element {}"
-                             .format(failure.action_name, element.name))
-
-                try:
-                    choice = click.prompt("Choice:", default='continue', err=True,
-                                          value_proc=_prefix_choice_value_proc(choices))
-                except click.Abort:
-                    # Ensure a newline after automatically printed '^C'
-                    click.echo("", err=True)
-                    choice = 'terminate'
-
-                # Handle choices which you can come back from
-                #
-                assert choice != 'shell'  # This won't work for now
-                if choice == 'shell':
-                    click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
-                    try:
-                        prompt = self.shell_prompt(element)
-                        self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
-                    except BstError as e:
-                        click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
-                elif choice == 'log':
-                    with open(failure.logfile, 'r') as logfile:
-                        content = logfile.read()
-                        click.echo_via_pager(content)
-
-            if choice == 'terminate':
-                click.echo("\nTerminating all jobs\n", err=True)
-                self.stream.terminate()
-            else:
-                if choice == 'quit':
-                    click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
-                    self.stream.quit()
-                elif choice == 'continue':
-                    click.echo("\nContinuing with other non failing elements\n", err=True)
-                elif choice == 'retry':
-                    click.echo("\nRetrying failed job\n", err=True)
-                    # FIXME: Outstandingly nasty modification of core state
-                    queue._task_group.failed_tasks.remove(element._get_full_name())
-                    queue.enqueue([element])
+        if self.context.sched_error_action == 'terminate':
+            self.stream.terminate()
+        elif self.context.sched_error_action == 'quit':
+            self.stream.quit()
+        elif self.context.sched_error_action == 'continue':
+            pass
+        return
+
+        # assert False
+        # # Interactive mode for element failures
+        # with self._interrupted():
+
+        #     summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
+        #                "\n" +
+        #                "Choose one of the following options:\n" +
+        #                "  (c)ontinue  - Continue queueing jobs as much as possible\n" +
+        #                "  (q)uit      - Exit after all ongoing jobs complete\n" +
+        #                "  (t)erminate - Terminate any ongoing jobs and exit\n" +
+        #                "  (r)etry     - Retry this job\n")
+        #     if failure.logfile:
+        #         summary += "  (l)og       - View the full log file\n"
+        #     if failure.sandbox:
+        #         summary += "  (s)hell     - Drop into a shell in the failed build sandbox\n"
+        #     summary += "\nPressing ^C will terminate jobs and exit\n"
+
+        #     choices = ['continue', 'quit', 'terminate', 'retry']
+        #     if failure.logfile:
+        #         choices += ['log']
+        #     if failure.sandbox:
+        #         choices += ['shell']
+
+        #     choice = ''
+        #     while choice not in ['continue', 'quit', 'terminate', 'retry']:
+        #         click.echo(summary, err=True)
+
+        #         self._notify("BuildStream failure", "{} on element {}"
+        #                      .format(failure.action_name, element.name))
+
+        #         try:
+        #             choice = click.prompt("Choice:", default='continue', err=True,
+        #                                   value_proc=_prefix_choice_value_proc(choices))
+        #         except click.Abort:
+        #             # Ensure a newline after automatically printed '^C'
+        #             click.echo("", err=True)
+        #             choice = 'terminate'
+
+        #         # Handle choices which you can come back from
+        #         #
+        #         assert choice != 'shell'  # This won't work for now
+        #         if choice == 'shell':
+        #             click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
+        #             try:
+        #                 prompt = self.shell_prompt(element)
+        #                 self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
+        #             except BstError as e:
+        #                 click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
+        #         elif choice == 'log':
+        #             with open(failure.logfile, 'r') as logfile:
+        #                 content = logfile.read()
+        #                 click.echo_via_pager(content)
+
+        #     if choice == 'terminate':
+        #         click.echo("\nTerminating all jobs\n", err=True)
+        #         self.stream.terminate()
+        #     else:
+        #         if choice == 'quit':
+        #             click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
+        #             self.stream.quit()
+        #         elif choice == 'continue':
+        #             click.echo("\nContinuing with other non failing elements\n", err=True)
+        #         elif choice == 'retry':
+        #             click.echo("\nRetrying failed job\n", err=True)
+        #             queue.failed_elements.remove(element)
+        #             queue.enqueue([element])
 
     #
     # Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 14ecf30..2e9c740 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -52,6 +52,7 @@ class NotificationType(enum.Enum):
     JOB_START = "job_start"
     JOB_COMPLETE = "job_complete"
     TICK = "tick"
+    EXCEPTION = "exception"
 
 
 class Notification:
@@ -64,7 +65,9 @@ class Notification:
                  job_status=None,
                  failed_element=False,
                  elapsed_time=None,
-                 element=None):
+                 element=None,
+                 exception=None):
+
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -72,6 +75,7 @@ class Notification:
         self.failed_element = failed_element
         self.elapsed_time = elapsed_time
         self.element = element
+        self.exception = exception
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e8b3931..d2a4d08 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,7 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
+import asyncio
 import itertools
 import functools
 import multiprocessing as mp
@@ -31,6 +32,7 @@ import tarfile
 import tempfile
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
+import queue
 
 from ._artifact import Artifact
 from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -46,6 +48,40 @@ from . import utils, _yaml, _site
 from . import Scope, Consistency
 
 
+# A decorator which runs the decorated method to be run in a subprocess
+def subprocessed(func):
+
+    @functools.wraps(func)
+    def _subprocessed(self, *args, **kwargs):
+        assert self
+        print("Args: {}".format([*args]))
+        print("Kwargs: {}".format(list(kwargs.items())))
+        assert not self._subprocess
+
+        # TODO use functools to pass arguments to func to make target for subprocess
+
+        # Start subprocessed work
+        mp_context = mp.get_context(method='spawn')
+        process_name = "stream-{}".format(func.__name__)
+        target = functools.partial(func, self, *args, **kwargs)
+        print("launching subprocess:", process_name)
+        self._subprocess = mp_context.Process(target=target, name=process_name)
+        self._subprocess.run()
+
+        # TODO connect signal handlers
+
+        # Run event loop. This event loop should exit once the
+        # subprocessed work has completed
+        print("Starting loop...")
+        while not self._subprocess.exitcode:
+            self._loop()
+        print("Stopping loop...")
+
+        # Return result of subprocessed function
+
+    return _subprocessed
+
+
 # Stream()
 #
 # This is the main, toplevel calling interface in BuildStream core.
@@ -77,6 +113,7 @@ class Stream():
         #
         # Private members
         #
+        self._subprocess = None
         self._notification_queue = mp.Queue()
         self._context = context
         self._artifacts = None
@@ -245,6 +282,7 @@ class Stream():
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
+    @subprocessed
     def build(self, targets, *,
               track_targets=None,
               track_except=None,
@@ -1592,6 +1630,9 @@ class Stream():
                 else:
                     unique_id = None
                 self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+        elif notification.notification_type == NotificationType.EXCEPTION:
+            # TODO
+            pass
         else:
             raise StreamError("Unreccognised notification type recieved")
 
@@ -1610,31 +1651,19 @@ class Stream():
         #
         raise TypeError("Stream objects should not be pickled.")
 
-    # TODO
-    # Causes the decorated method to be run in a subprocess
-    @contextmanager
-    def subprocessed(self, func, *args, **kwargs):
-        pass
-        # Set up event loop
-
-        # Start subprocessed work
-
-        # Run event loop. This event loop should exit once the
-        # subprocessed work has completed
-
-        # Return result of subprocessed function
-
     # The code to be run by the Stream's event loop while delegating
-    # work to a subprocess with the @subprocessed
+    # work to a subprocess with the @subprocessed decorator
     def _loop(self):
         assert self._notification_queue
-        # Check that the subprocessed work has not finished
-        # TODO
 
         # Check for new messages
-        notification = self._notification_queue.get(block=True, timeout=0.1)
+        try:
+            notification = self._notification_queue.get(block=True, timeout=0.1)
+        except queue.Empty:
+            notification = None
+            print("queue empty, continuing...")
 
         # Process new messages
         if notification:
+            print("handling notifications")
             self._scheduler_notification_handler(notification)
-


[buildstream] 01/19: element.py: Remove redundant second call to _get_cache_key()

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 456da5723d447789260a7c970dcfca736567e240
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Jul 31 15:39:15 2019 +0100

    element.py: Remove redundant second call to _get_cache_key()
---
 src/buildstream/element.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 21c38bc..a4404fc 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1242,7 +1242,7 @@ class Element(Plugin):
 
         if not cache_key:
             cache_key = "{:?<64}".format('')
-        elif self._get_cache_key() == self.__strict_cache_key:
+        elif cache_key == self.__strict_cache_key:
             # Strong cache key used in this session matches cache key
             # that would be used in strict build mode
             dim_key = False


[buildstream] 19/19: Add workarounds for queue querying in main process

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit dc863015320f1ff9b749a678a2a1ce4b83c006e0
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jul 16 16:48:38 2019 +0100

    Add workarounds for queue querying in main process
---
 src/buildstream/_frontend/app.py        |  3 ++-
 src/buildstream/_scheduler/scheduler.py | 29 ++++++++++++++++++-----------
 src/buildstream/_stream.py              | 13 ++++++-------
 3 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 90070af..7fd10b4 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -570,8 +570,9 @@ class App():
         if not self.stream.terminated:
             if element_job:
                 # look-up queue
+                # Issue with pickling a queue object, so for now only pass action names
                 for q in self.stream.queues:
-                    if q.action_name == action_name:
+                    if q == action_name:
                         queue = q
                 assert queue, "Job action {} does not have a corresponding queue".format(action_name)
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6649865..44ebeef 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
 import signal
 import datetime
 from contextlib import contextmanager
+import queue
 
 # Local imports
 from .resources import Resources, ResourceType
@@ -55,6 +56,7 @@ class NotificationType(enum.Enum):
     EXCEPTION = "exception"
     TASK_ERROR = "task_error"
     SCHED_TERMINATE = "sched_terminate"
+    QUEUES = "queues"
 
 
 class Notification:
@@ -70,7 +72,8 @@ class Notification:
                  element=None,
                  exception=None,
                  domain=None,
-                 reason=None):
+                 reason=None,
+                 queues=None):
 
         self.notification_type = notification_type
         self.full_name = full_name
@@ -82,6 +85,7 @@ class Notification:
         self.exception = exception
         self.domain = domain
         self.reason = reason
+        self.queues = queues
 
 
 # Scheduler()
@@ -175,6 +179,14 @@ class Scheduler():
         # Hold on to the queues to process
         self.queues = queues
 
+        # Report to the main process which queues are in session,
+        # for now a list of action_names as pickling queues is
+        # causing errors. Will need actual queue object or bidirectional
+        # notification queue for error handling later.
+        queue_list = [q.action_name for q in self.queues]
+        notifcation = Notification(NotificationType.QUEUES, queues=queue_list)
+        self._notify(notifcation)
+
         # Ensure that we have a fresh new event loop, in case we want
         # to run another test in this thread.
         self.loop = asyncio.new_event_loop()
@@ -294,20 +306,17 @@ class Scheduler():
     #    queue (Queue): The Queue holding a complete job
     #    job (Job): The completed Job
     #    status (JobStatus): The status of the completed job
-    #    process_jobs (bool): If the scheduler should also process the
-    #                         job, else just generate the notification
     #
-    def job_completed(self, job, status, process_jobs=True):
+    def job_completed(self, job, status):
 
-        if process_jobs:
-            # Remove from the active jobs list
-            self._active_jobs.remove(job)
+        self._active_jobs.remove(job)
 
+        element = None
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure messages
             # and send the Element() instance if interactive failure handling. Note
             # this may change if the frontend is run in a separate process for pickling
-            element = job._element if (job.element_job and self._interactive_failure) else None
+            element = job._element if (job.element_job and self._interactive_failure) else element
 
         notification = Notification(NotificationType.JOB_COMPLETE,
                                     full_name=job.name,
@@ -317,9 +326,7 @@ class Scheduler():
                                     element=element)
         self._notify(notification)
 
-        if process_jobs:
-            # Now check for more jobs
-            self._sched()
+        self._sched()
 
     # check_cache_size():
     #
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4de975e..e0f0842 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,7 +38,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
-    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
+    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus, Notification
 from ._pipeline import Pipeline, PipelineSelection
 from ._profile import Topics, PROFILER
 from ._state import State
@@ -114,7 +114,6 @@ class Stream():
         try:
             func(*args, **kwargs)
         except Exception as e:
-            from ._scheduler.scheduler import Notification, NotificationType
             queue.put(Notification(NotificationType.EXCEPTION, exception=e))
 
     def run_in_subprocess(self, func, *args, **kwargs):
@@ -367,6 +366,7 @@ class Stream():
         if track_elements:
             self._enqueue_plan(track_elements, queue=track_queue)
         self._enqueue_plan(elements)
+
         self._run()
 
     # fetch()
@@ -1646,15 +1646,14 @@ class Stream():
         elif notification.notification_type == NotificationType.JOB_COMPLETE:
             self._state.remove_task(notification.job_action, notification.full_name)
             if notification.job_status == JobStatus.FAIL:
-                if notification.failed_element:
-                    unique_id = notification.full_name
-                else:
-                    unique_id = None
-                self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+                self._state.fail_task(notification.job_action, notification.full_name,
+                                      notification.failed_element, notification.element)
         elif notification.notification_type == NotificationType.EXCEPTION:
             raise notification.exception
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(notification.domain, notification.reason)
+        elif notification.notification_type == NotificationType.QUEUES:
+            self.queues = notification.queues
         else:
             raise StreamError("Unreccognised notification type recieved")
 


[buildstream] 08/19: Send scheduler notifications over a multiprocessing queue

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e39c94064de749d31bf41987fccaa19fc129a975
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 10:18:49 2019 +0100

    Send scheduler notifications over a multiprocessing queue
---
 src/buildstream/_scheduler/scheduler.py | 17 ++++++++++-------
 src/buildstream/_stream.py              |  4 +++-
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9e120e4..14ecf30 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -98,7 +98,7 @@ class Notification:
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state, notification_handler,
+                 start_time, state, notification_queue,
                  interrupt_callback=None,
                  ticker_callback=None,
                  interactive_failure=False):
@@ -131,8 +131,8 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callback to send notifications to report back to the Scheduler's owner
-        self.notify = notification_handler
+        # Message to send notifications back to the Scheduler's owner
+        self._notification_queue = notification_queue
 
         # Whether our exclusive jobs, like 'cleanup' are currently already
         # waiting or active.
@@ -305,7 +305,7 @@ class Scheduler():
                                     job_status=status,
                                     failed_element=job.element_job,
                                     element=element)
-        self.notify(notification)
+        self._notify(notification)
 
         if process_jobs:
             # Now check for more jobs
@@ -369,7 +369,7 @@ class Scheduler():
                                     full_name=job.name,
                                     job_action=job.action_name,
                                     elapsed_time=self.elapsed_time())
-        self.notify(notification)
+        self._notify(notification)
         job.start()
 
     # Callback for the cache size job
@@ -589,7 +589,7 @@ class Scheduler():
             return
 
         notification = Notification(NotificationType.INTERRUPT)
-        self.notify(notification)
+        self._notify(notification)
 
     # _terminate_event():
     #
@@ -649,9 +649,12 @@ class Scheduler():
     # Regular timeout for driving status in the UI
     def _tick(self):
         notification = Notification(NotificationType.TICK)
-        self.notify(notification)
+        self._notify(notification)
         self.loop.call_later(1, self._tick)
 
+    def _notify(self, notification):
+        self._notification_queue.put(notification)
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 45bb41b..2d451c6 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -21,6 +21,7 @@
 
 import itertools
 import functools
+import multiprocessing as mp
 import os
 import sys
 import stat
@@ -76,6 +77,7 @@ class Stream():
         #
         # Private members
         #
+        self._notification_queue = mp.Queue()
         self._context = context
         self._artifacts = None
         self._sourcecache = None
@@ -85,7 +87,7 @@ class Stream():
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+        self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
                                     interrupt_callback=interrupt_callback,
                                     ticker_callback=ticker_callback,
                                     interactive_failure=interactive_failure)


[buildstream] 05/19: WIP: Refactor scheduler-frontend communication

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit b7b445cdb0de0223015c71506d8689135151c966
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 13 17:47:04 2019 +0100

    WIP: Refactor scheduler-frontend communication
---
 src/buildstream/_frontend/app.py        |  2 +
 src/buildstream/_scheduler/__init__.py  |  2 +-
 src/buildstream/_scheduler/scheduler.py | 76 +++++++++++++++++++++++++--------
 src/buildstream/_stream.py              | 25 ++++++++++-
 4 files changed, 85 insertions(+), 20 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 76e3bc7..9f90938 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -607,6 +607,7 @@ class App():
                 pass
             return
 
+        assert False
         # Interactive mode for element failures
         with self._interrupted():
 
@@ -646,6 +647,7 @@ class App():
 
                 # Handle choices which you can come back from
                 #
+                assert choice != 'shell'  # This won't work for now
                 if choice == 'shell':
                     click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
                     try:
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458f..d689d6e 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
 from .queues.artifactpushqueue import ArtifactPushQueue
 from .queues.pullqueue import PullQueue
 
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
 from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 4f668c6..a9286d4 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -21,6 +21,7 @@
 # System imports
 import os
 import asyncio
+import enum
 from itertools import chain
 import signal
 import datetime
@@ -45,6 +46,34 @@ _ACTION_NAME_CLEANUP = 'clean'
 _ACTION_NAME_CACHE_SIZE = 'size'
 
 
+@enum.unique
+class NotificationType(enum.Enum):
+    INTERRUPT = "interrupt"
+    JOB_START = "job_start"
+    JOB_COMPLETE = "job_complete"
+    TICK = "tick"
+
+
+class Notification:
+
+    def __init__(self,
+                 notification_type,
+                 *,
+                 full_name=None,
+                 job_action=None,
+                 job_status=None,
+                 failed_element=False,
+                 elapsed_time=None,
+                 element=None):
+        self.notification_type = notification_type
+        self.full_name = full_name
+        self.job_action = job_action
+        self.job_status = job_status
+        self.failed_element = failed_element
+        self.elapsed_time = elapsed_time
+        self.element = element
+
+
 # Scheduler()
 #
 # The scheduler operates on a list queues, each of which is meant to accomplish
@@ -69,7 +98,7 @@ _ACTION_NAME_CACHE_SIZE = 'size'
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state,
+                 start_time, state, message_handler,
                  interrupt_callback=None,
                  ticker_callback=None,
                  interactive_failure=False):
@@ -102,9 +131,17 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callbacks to report back to the Scheduler owner
-        self._interrupt_callback = interrupt_callback
-        self._ticker_callback = ticker_callback
+        # Callback to send messages to report back to the Scheduler's owner
+        self.message = message_handler
+
+        # Whether our exclusive jobs, like 'cleanup' are currently already
+        # waiting or active.
+        #
+        # This is just a bit quicker than scanning the wait queue and active
+        # queue and comparing job action names.
+        #
+        self._exclusive_waiting = set()
+        self._exclusive_active = set()
 
         self.resources = Resources(context.sched_builders,
                                    context.sched_fetchers,
@@ -134,8 +171,7 @@ class Scheduler():
         asyncio.set_event_loop(self.loop)
 
         # Add timeouts
-        if self._ticker_callback:
-            self.loop.call_later(1, self._tick)
+        self.loop.call_later(1, self._tick)
 
         # Handle unix signals while running
         self._connect_signals()
@@ -254,13 +290,19 @@ class Scheduler():
         # Remove from the active jobs list
         self._active_jobs.remove(job)
 
-        self._state.remove_task(job.action_name, job.name)
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure messages
             # and send the Element() instance if interactive failure handling. Note
             # this may change if the frontend is run in a separate process for pickling
             element = job._element if (job.element_job and self._interactive_failure) else None
-            self._state.fail_task(job.action_name, job.name, element_job=job.element_job, element=element)
+
+            message = Notification(NotificationType.JOB_COMPLETE,
+                                   full_name=job.name,
+                                   job_action=job.action_name,
+                                   job_status=status,
+                                   failed_element=job.element_job,
+                                   element=element)
+            self.message(message)
 
         # Now check for more jobs
         self._sched()
@@ -319,7 +361,11 @@ class Scheduler():
     #
     def _start_job(self, job):
         self._active_jobs.append(job)
-        self._state.add_task(job.action_name, job.name, self.elapsed_time())
+        message = Notification(NotificationType.JOB_START,
+                               full_name=job.name,
+                               job_action=job.action_name,
+                               elapsed_time=self.elapsed_time())
+        self.message(message)
         job.start()
 
     # Callback for the cache size job
@@ -538,13 +584,8 @@ class Scheduler():
         if self.terminated:
             return
 
-        # Leave this to the frontend to decide, if no
-        # interrrupt callback was specified, then just terminate.
-        if self._interrupt_callback:
-            self._interrupt_callback()
-        else:
-            # Default without a frontend is just terminate
-            self.terminate_jobs()
+        message = Notification(NotificationType.INTERRUPT)
+        self.message(message)
 
     # _terminate_event():
     #
@@ -603,7 +644,8 @@ class Scheduler():
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        self._ticker_callback()
+        message = Notification(NotificationType.TICK)
+        self.message(message)
         self.loop.call_later(1, self._tick)
 
     def __getstate__(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 5f12889..45bb41b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
-    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
 from ._pipeline import Pipeline, PipelineSelection
 from ._profile import Topics, PROFILER
 from ._state import State
@@ -85,12 +85,14 @@ class Stream():
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state,
+        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
                                     interrupt_callback=interrupt_callback,
                                     ticker_callback=ticker_callback,
                                     interactive_failure=interactive_failure)
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
+        self._ticker_callback = ticker_callback
+        self._interrupt_callback = interrupt_callback
 
     # init()
     #
@@ -1572,6 +1574,25 @@ class Stream():
 
         return element_targets, artifact_refs
 
+    def _scheduler_notification_handler(self, notification):
+        if notification.notification_type == NotificationType.INTERRUPT:
+            self._interrupt_callback()
+        elif notification.notification_type == NotificationType.TICK:
+            self._ticker_callback()
+        elif notification.notification_type == NotificationType.JOB_START:
+            self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time)
+
+        elif notification.notification_type == NotificationType.JOB_COMPLETE:
+            self._state.remove_task(notification.job_action, notification.full_name)
+            if notification.job_status == JobStatus.FAIL:
+                if notification.failed_element:
+                    unique_id = notification.full_name
+                else:
+                    unique_id = None
+                self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+        else:
+            raise StreamError("Unreccognised notification type recieved")
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and


[buildstream] 12/19: Change subprocess.run to start

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit be6e68ff519c3185252149a98d59f42dc72925bd
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jul 4 14:44:29 2019 +0100

    Change subprocess.run to start
---
 src/buildstream/_stream.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d2a4d08..0fe2234 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -66,7 +66,7 @@ def subprocessed(func):
         target = functools.partial(func, self, *args, **kwargs)
         print("launching subprocess:", process_name)
         self._subprocess = mp_context.Process(target=target, name=process_name)
-        self._subprocess.run()
+        self._subprocess.start()
 
         # TODO connect signal handlers
 


[buildstream] 16/19: fixup! fixup! fixup! Move subprocess machinery into a method

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 9b491a9e2f7ac60da0e0257841f44545475a194c
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 15:51:54 2019 +0100

    fixup! fixup! fixup! Move subprocess machinery into a method
---
 src/buildstream/_stream.py | 2 --
 1 file changed, 2 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index ed83747..89d6539 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -114,9 +114,7 @@ class Stream():
 
         global notification_count
         notification_count = 0
-        # TODO use functools to pass arguments to func to make target for subprocess
 
-        # Start subprocessed work
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
         print("launchinglaunching subprocess:", process_name)


[buildstream] 06/19: Add workaround for buildqueue job_complete direct callback

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit af619ea27e6da79d809f436c93a5ba8faa7f7738
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 12:34:22 2019 +0100

    Add workaround for buildqueue job_complete direct callback
---
 src/buildstream/_scheduler/scheduler.py | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index a9286d4..5bf0764 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -284,11 +284,14 @@ class Scheduler():
     #    queue (Queue): The Queue holding a complete job
     #    job (Job): The completed Job
     #    status (JobStatus): The status of the completed job
+    #    process_jobs (bool): If the scheduler should also process the
+    #                         job, else just generate the notification
     #
-    def job_completed(self, job, status):
+    def job_completed(self, job, status, process_jobs=True):
 
-        # Remove from the active jobs list
-        self._active_jobs.remove(job)
+        if process_jobs:
+            # Remove from the active jobs list
+            self._active_jobs.remove(job)
 
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure messages
@@ -304,8 +307,9 @@ class Scheduler():
                                    element=element)
             self.message(message)
 
-        # Now check for more jobs
-        self._sched()
+        if process_jobs:
+            # Now check for more jobs
+            self._sched()
 
     # check_cache_size():
     #


[buildstream] 02/19: plugin.py: cache full_name member in __init__

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c5db3d822a2433b97ee52e9108da135edf4ba13e
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Aug 6 13:15:58 2019 +0100

    plugin.py: cache full_name member in __init__
    
    Once project & type are resolved, the full_name can be computed
    and cached for efficiency. The accessor for getting the private
    member should also be moved to the correct section.
---
 src/buildstream/plugin.py | 29 ++++++++++++++++++++++-------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index d963916..506eba5 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -246,6 +246,9 @@ class Plugin():
         self.__type_tag = type_tag      # The type of plugin (element or source)
         self.__configuring = False      # Whether we are currently configuring
 
+        # Set the full_name as project & type_tag are resolved
+        self.__full_name = self.__set_full_name()
+
         # Infer the kind identifier
         modulename = type(self).__module__
         self.__kind = modulename.split('.')[-1]
@@ -672,6 +675,18 @@ class Plugin():
     def _preflight(self):
         self.preflight()
 
+    # _get_full_name():
+    #
+    # The instance full name of the plugin prepended with the owning
+    # junction if appropriate. This being the name of the given element,
+    # as appose to the class name of the underlying plugin __kind identifier.
+    #
+    # Returns:
+    #    (str): element full name, with prepended owning junction if appropriate
+    #
+    def _get_full_name(self):
+        return self.__full_name
+
     # _get_args_for_child_job_pickling(self)
     #
     # Return data necessary to reconstruct this object in a child job process.
@@ -728,13 +743,6 @@ class Plugin():
         output.flush()
         self.status('Running host command', detail=command)
 
-    def _get_full_name(self):
-        project = self.__project
-        if project.junction:
-            return '{}:{}'.format(project.junction.name, self.name)
-        else:
-            return self.name
-
     def __deprecation_warning_silenced(self):
         if not self.BST_PLUGIN_DEPRECATED:
             return False
@@ -751,6 +759,13 @@ class Plugin():
 
             return self.get_kind() in silenced_warnings
 
+    def __set_full_name(self):
+        project = self.__project
+        if project.junction:
+            return '{}:{}'.format(project.junction.name, self.name)
+        else:
+            return self.name
+
 
 # A local table for _prefix_warning()
 #


[buildstream] 15/19: fixup! fixup! Move subprocess machinery into a method

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d851dec4e2166291a184cf85fdc474ce1a64448f
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 15:45:00 2019 +0100

    fixup! fixup! Move subprocess machinery into a method
---
 src/buildstream/_stream.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 1de8364..ed83747 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -106,8 +106,6 @@ class Stream():
     def init(self):
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
-        print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__,
-              id(Stream.build))
 
     def run_in_subprocess(self, func, *args, **kwargs):
         print("Args: {}".format([*args]))
@@ -126,7 +124,7 @@ class Stream():
         self._subprocess.start()
 
         # TODO connect signal handlers
-        while self._subprocess.exitcode is not None:
+        while self._subprocess.exitcode is not:
             self._subprocess.join(0.1)
             self._loop()
         print("Stopping loop...")


[buildstream] 17/19: Some tweaks

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c2dafd2dc11af73838ae19e337f94687244f60cc
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Jul 8 12:34:05 2019 +0100

    Some tweaks
---
 src/buildstream/_stream.py | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 89d6539..afbeb20 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -112,9 +112,6 @@ class Stream():
         print("Kwargs: {}".format(list(kwargs.items())))
         assert not self._subprocess
 
-        global notification_count
-        notification_count = 0
-
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
         print("launchinglaunching subprocess:", process_name)
@@ -122,8 +119,10 @@ class Stream():
         self._subprocess.start()
 
         # TODO connect signal handlers
-        while self._subprocess.exitcode is not:
+        while self._subprocess.exitcode is None:
+            # check every given time interval on subprocess state
             self._subprocess.join(0.1)
+            # if no exit code, go back to checking the message queue
             self._loop()
         print("Stopping loop...")
 


[buildstream] 10/19: stream.py: Minimal implementation of event loop function

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 48847cdffdee48e8698011be6bdc1660eb08ae14
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 11:12:57 2019 +0100

    stream.py: Minimal implementation of event loop function
---
 src/buildstream/_stream.py | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4a1edcf..e8b3931 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1623,3 +1623,18 @@ class Stream():
         # subprocessed work has completed
 
         # Return result of subprocessed function
+
+    # The code to be run by the Stream's event loop while delegating
+    # work to a subprocess with the @subprocessed
+    def _loop(self):
+        assert self._notification_queue
+        # Check that the subprocessed work has not finished
+        # TODO
+
+        # Check for new messages
+        notification = self._notification_queue.get(block=True, timeout=0.1)
+
+        # Process new messages
+        if notification:
+            self._scheduler_notification_handler(notification)
+


[buildstream] 18/19: Support exception handling from the subprocess

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 07bb725bbf4817a4982f3eaeb5a4268bade3e9c7
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Jul 15 11:51:18 2019 +0100

    Support exception handling from the subprocess
---
 src/buildstream/_exceptions.py             |  3 +-
 src/buildstream/_scheduler/jobs/job.py     |  2 +
 src/buildstream/_scheduler/queues/queue.py |  1 +
 src/buildstream/_scheduler/scheduler.py    | 27 +++++++++++++-
 src/buildstream/_stream.py                 | 59 ++++++++++++++++++++++--------
 src/buildstream/testing/runcli.py          |  2 +-
 src/buildstream/utils.py                   |  4 ++
 7 files changed, 80 insertions(+), 18 deletions(-)

diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index d5b87a8..e977660 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -238,7 +238,8 @@ class LoadErrorReason(Enum):
 # interpreting project YAML
 #
 class LoadError(BstError):
-    def __init__(self, message, reason, *, detail=None):
+    def __init__(self, message, reason=None, *, detail=None):
+        # Second parameter needs to be a default arg due to unpickling issue, unpleasant.
         super().__init__(message, detail=detail, domain=ErrorDomain.LOAD, reason=reason)
 
 
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index d80d1a9..9e3ca13 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -488,6 +488,8 @@ class Job():
             # is currently managed in _exceptions.py
             set_last_task_error(envelope.message['domain'],
                                 envelope.message['reason'])
+            self._scheduler.set_last_task_error(envelope.message['domain'],
+                                                envelope.message['reason'])
         elif envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 8c81ce2..3435e3f 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -316,6 +316,7 @@ class Queue():
             # This just allows us stronger testing capability
             #
             set_last_task_error(e.domain, e.reason)
+            self._scheduler.set_last_task_error(e.domain, e.reason)
 
         except Exception:   # pylint: disable=broad-except
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2e9c740..6649865 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -53,6 +53,8 @@ class NotificationType(enum.Enum):
     JOB_COMPLETE = "job_complete"
     TICK = "tick"
     EXCEPTION = "exception"
+    TASK_ERROR = "task_error"
+    SCHED_TERMINATE = "sched_terminate"
 
 
 class Notification:
@@ -66,7 +68,9 @@ class Notification:
                  failed_element=False,
                  elapsed_time=None,
                  element=None,
-                 exception=None):
+                 exception=None,
+                 domain=None,
+                 reason=None):
 
         self.notification_type = notification_type
         self.full_name = full_name
@@ -76,6 +80,8 @@ class Notification:
         self.elapsed_time = elapsed_time
         self.element = element
         self.exception = exception
+        self.domain = domain
+        self.reason = reason
 
 
 # Scheduler()
@@ -331,6 +337,12 @@ class Scheduler():
         #
         self._cache_size_scheduled = True
 
+    def set_last_task_error(self, domain, reason):
+        notification = Notification(NotificationType.TASK_ERROR,
+                                    domain=domain,
+                                    reason=reason)
+        self._notify(notification)
+
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -673,3 +685,16 @@ class Scheduler():
         # a new use-case arises.
         #
         raise TypeError("Scheduler objects should not be pickled.")
+
+    def _loop(self):
+        assert self._notification_queue
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notification_queue.get_nowait()
+                if notification.notification_type == NotificationType.SCHED_TERMINATE:
+                    print("handling notifications")
+                    self.terminate_jobs()
+            except queue.Empty:
+                notification = None
+                break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index afbeb20..4de975e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -35,7 +35,7 @@ import queue
 
 from ._artifact import Artifact
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
     SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
@@ -107,6 +107,16 @@ class Stream():
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
 
+    @staticmethod
+    def _subprocess_main(func, queue, *args, **kwargs):
+        # Set main process
+        utils._reset_main_pid()
+        try:
+            func(*args, **kwargs)
+        except Exception as e:
+            from ._scheduler.scheduler import Notification, NotificationType
+            queue.put(Notification(NotificationType.EXCEPTION, exception=e))
+
     def run_in_subprocess(self, func, *args, **kwargs):
         print("Args: {}".format([*args]))
         print("Kwargs: {}".format(list(kwargs.items())))
@@ -114,11 +124,16 @@ class Stream():
 
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
-        print("launchinglaunching subprocess:", process_name)
-        self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+        args = list(args)
+        args.insert(0, self._notification_queue)
+        args.insert(0, func)
+        print("launching subprocess:", process_name)
+        self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
+                                              kwargs=kwargs, name=process_name)
+
         self._subprocess.start()
 
-        # TODO connect signal handlers
+        # TODO connect signal handlers with asyncio
         while self._subprocess.exitcode is None:
             # check every given time interval on subprocess state
             self._subprocess.join(0.1)
@@ -126,13 +141,17 @@ class Stream():
             self._loop()
         print("Stopping loop...")
 
-        # try:
-        #     while True:
-        #         notification = self._notification_queue.get_nowait()
-        #         self._scheduler_notification_handler(notification)
-        # except queue.Empty:
-        #     print("Finished processing notifications")
-        #     pass
+        # Set main process back
+        utils._reset_main_pid()
+
+        # Ensure no more notifcations to process
+        try:
+            while True:
+                notification = self._notification_queue.get_nowait()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            print("Finished processing notifications")
+            pass
 
     # cleanup()
     #
@@ -1086,7 +1105,15 @@ class Stream():
     # Terminate jobs
     #
     def terminate(self):
+        #if self._scheduler.loop:
+            # Scheduler not in subprocess
         self._scheduler.terminate_jobs()
+        #else:
+        #    # Handle calling subprocessed scheduler outside of main process
+        #    assert self._notification_queue
+        #    from ._scheduler.scheduler import Notification, NotificationType
+        #    self._notifiaction_queue.put(Notification(NotificationType.SCHED_TERMINATE))
+        #    self._scheduler.terminate_jobs()
 
     # quit()
     #
@@ -1625,8 +1652,9 @@ class Stream():
                     unique_id = None
                 self._state.fail_task(notification.job_action, notification.full_name, unique_id)
         elif notification.notification_type == NotificationType.EXCEPTION:
-            # TODO
-            pass
+            raise notification.exception
+        elif notification.notification_type == NotificationType.TASK_ERROR:
+            set_last_task_error(notification.domain, notification.reason)
         else:
             raise StreamError("Unreccognised notification type recieved")
 
@@ -1653,8 +1681,9 @@ class Stream():
         while True:
             try:
                 notification = self._notification_queue.get_nowait()
-                print("handling notifications")
-                self._scheduler_notification_handler(notification)
+                if notification.notification_type != NotificationType.SCHED_TERMINATE:
+                    print("handling notifications")
+                    self._scheduler_notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break
diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py
index 95bf83e..ad71705 100644
--- a/src/buildstream/testing/runcli.py
+++ b/src/buildstream/testing/runcli.py
@@ -85,7 +85,6 @@ class Result():
         # in the case that the exit code reported is 0 (success).
         #
         if self.exit_code != 0:
-
             # Check if buildstream failed to handle an
             # exception, topevel CLI exit should always
             # be a SystemExit exception.
@@ -149,6 +148,7 @@ class Result():
                     self.exception.domain,
                     self.exception.reason
                 ))
+
         assert self.exit_code == -1, fail_message
         assert self.exc is not None, fail_message
         assert self.exception is not None, fail_message
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 2c57925..ed1e123 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -711,6 +711,10 @@ def _is_main_process():
     assert _main_pid is not None
     return os.getpid() == _main_pid
 
+def _reset_main_pid():
+    global _main_pid
+    _main_pid = os.getpid()
+
 
 # Recursively remove directories, ignoring file permissions as much as
 # possible.