You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:05 UTC

[buildstream] 11/19: WIP: Add a way to run stream methods in a subprocess

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d12e62d7aeb8595133b0c732a4b0038ae2479a24
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jul 4 09:55:51 2019 +0100

    WIP: Add a way to run stream methods in a subprocess
    
    Currently the frontend is getting stuck in an infite loop.
    
    Also contains debugging print statements which will need to be
    stripped out.
---
 src/buildstream/_frontend/app.py        | 153 ++++++++++++++++----------------
 src/buildstream/_scheduler/scheduler.py |   6 +-
 src/buildstream/_stream.py              |  67 ++++++++++----
 3 files changed, 129 insertions(+), 97 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 9f90938..90070af 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -596,84 +596,83 @@ class App():
 
     def _handle_failure(self, element, queue, failure):
 
-        # Handle non interactive mode setting of what to do when a job fails.
-        if not self._interactive_failures:
+        # # Handle non interactive mode setting of what to do when a job fails.
+        # if not self._interactive_failures:
 
-            if self.context.sched_error_action == _SchedulerErrorAction.TERMINATE:
-                self.stream.terminate()
-            elif self.context.sched_error_action == _SchedulerErrorAction.QUIT:
-                self.stream.quit()
-            elif self.context.sched_error_action == _SchedulerErrorAction.CONTINUE:
-                pass
-            return
-
-        assert False
-        # Interactive mode for element failures
-        with self._interrupted():
-
-            summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
-                       "\n" +
-                       "Choose one of the following options:\n" +
-                       "  (c)ontinue  - Continue queueing jobs as much as possible\n" +
-                       "  (q)uit      - Exit after all ongoing jobs complete\n" +
-                       "  (t)erminate - Terminate any ongoing jobs and exit\n" +
-                       "  (r)etry     - Retry this job\n")
-            if failure.logfile:
-                summary += "  (l)og       - View the full log file\n"
-            if failure.sandbox:
-                summary += "  (s)hell     - Drop into a shell in the failed build sandbox\n"
-            summary += "\nPressing ^C will terminate jobs and exit\n"
-
-            choices = ['continue', 'quit', 'terminate', 'retry']
-            if failure.logfile:
-                choices += ['log']
-            if failure.sandbox:
-                choices += ['shell']
-
-            choice = ''
-            while choice not in ['continue', 'quit', 'terminate', 'retry']:
-                click.echo(summary, err=True)
-
-                self._notify("BuildStream failure", "{} on element {}"
-                             .format(failure.action_name, element.name))
-
-                try:
-                    choice = click.prompt("Choice:", default='continue', err=True,
-                                          value_proc=_prefix_choice_value_proc(choices))
-                except click.Abort:
-                    # Ensure a newline after automatically printed '^C'
-                    click.echo("", err=True)
-                    choice = 'terminate'
-
-                # Handle choices which you can come back from
-                #
-                assert choice != 'shell'  # This won't work for now
-                if choice == 'shell':
-                    click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
-                    try:
-                        prompt = self.shell_prompt(element)
-                        self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
-                    except BstError as e:
-                        click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
-                elif choice == 'log':
-                    with open(failure.logfile, 'r') as logfile:
-                        content = logfile.read()
-                        click.echo_via_pager(content)
-
-            if choice == 'terminate':
-                click.echo("\nTerminating all jobs\n", err=True)
-                self.stream.terminate()
-            else:
-                if choice == 'quit':
-                    click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
-                    self.stream.quit()
-                elif choice == 'continue':
-                    click.echo("\nContinuing with other non failing elements\n", err=True)
-                elif choice == 'retry':
-                    click.echo("\nRetrying failed job\n", err=True)
-                    # FIXME: Outstandingly nasty modification of core state
-                    queue._task_group.failed_tasks.remove(element._get_full_name())
-                    queue.enqueue([element])
+        if self.context.sched_error_action == 'terminate':
+            self.stream.terminate()
+        elif self.context.sched_error_action == 'quit':
+            self.stream.quit()
+        elif self.context.sched_error_action == 'continue':
+            pass
+        return
+
+        # assert False
+        # # Interactive mode for element failures
+        # with self._interrupted():
+
+        #     summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
+        #                "\n" +
+        #                "Choose one of the following options:\n" +
+        #                "  (c)ontinue  - Continue queueing jobs as much as possible\n" +
+        #                "  (q)uit      - Exit after all ongoing jobs complete\n" +
+        #                "  (t)erminate - Terminate any ongoing jobs and exit\n" +
+        #                "  (r)etry     - Retry this job\n")
+        #     if failure.logfile:
+        #         summary += "  (l)og       - View the full log file\n"
+        #     if failure.sandbox:
+        #         summary += "  (s)hell     - Drop into a shell in the failed build sandbox\n"
+        #     summary += "\nPressing ^C will terminate jobs and exit\n"
+
+        #     choices = ['continue', 'quit', 'terminate', 'retry']
+        #     if failure.logfile:
+        #         choices += ['log']
+        #     if failure.sandbox:
+        #         choices += ['shell']
+
+        #     choice = ''
+        #     while choice not in ['continue', 'quit', 'terminate', 'retry']:
+        #         click.echo(summary, err=True)
+
+        #         self._notify("BuildStream failure", "{} on element {}"
+        #                      .format(failure.action_name, element.name))
+
+        #         try:
+        #             choice = click.prompt("Choice:", default='continue', err=True,
+        #                                   value_proc=_prefix_choice_value_proc(choices))
+        #         except click.Abort:
+        #             # Ensure a newline after automatically printed '^C'
+        #             click.echo("", err=True)
+        #             choice = 'terminate'
+
+        #         # Handle choices which you can come back from
+        #         #
+        #         assert choice != 'shell'  # This won't work for now
+        #         if choice == 'shell':
+        #             click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
+        #             try:
+        #                 prompt = self.shell_prompt(element)
+        #                 self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
+        #             except BstError as e:
+        #                 click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
+        #         elif choice == 'log':
+        #             with open(failure.logfile, 'r') as logfile:
+        #                 content = logfile.read()
+        #                 click.echo_via_pager(content)
+
+        #     if choice == 'terminate':
+        #         click.echo("\nTerminating all jobs\n", err=True)
+        #         self.stream.terminate()
+        #     else:
+        #         if choice == 'quit':
+        #             click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
+        #             self.stream.quit()
+        #         elif choice == 'continue':
+        #             click.echo("\nContinuing with other non failing elements\n", err=True)
+        #         elif choice == 'retry':
+        #             click.echo("\nRetrying failed job\n", err=True)
+        #             queue.failed_elements.remove(element)
+        #             queue.enqueue([element])
 
     #
     # Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 14ecf30..2e9c740 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -52,6 +52,7 @@ class NotificationType(enum.Enum):
     JOB_START = "job_start"
     JOB_COMPLETE = "job_complete"
     TICK = "tick"
+    EXCEPTION = "exception"
 
 
 class Notification:
@@ -64,7 +65,9 @@ class Notification:
                  job_status=None,
                  failed_element=False,
                  elapsed_time=None,
-                 element=None):
+                 element=None,
+                 exception=None):
+
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -72,6 +75,7 @@ class Notification:
         self.failed_element = failed_element
         self.elapsed_time = elapsed_time
         self.element = element
+        self.exception = exception
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e8b3931..d2a4d08 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,7 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
+import asyncio
 import itertools
 import functools
 import multiprocessing as mp
@@ -31,6 +32,7 @@ import tarfile
 import tempfile
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
+import queue
 
 from ._artifact import Artifact
 from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -46,6 +48,40 @@ from . import utils, _yaml, _site
 from . import Scope, Consistency
 
 
+# A decorator which runs the decorated method to be run in a subprocess
+def subprocessed(func):
+
+    @functools.wraps(func)
+    def _subprocessed(self, *args, **kwargs):
+        assert self
+        print("Args: {}".format([*args]))
+        print("Kwargs: {}".format(list(kwargs.items())))
+        assert not self._subprocess
+
+        # TODO use functools to pass arguments to func to make target for subprocess
+
+        # Start subprocessed work
+        mp_context = mp.get_context(method='spawn')
+        process_name = "stream-{}".format(func.__name__)
+        target = functools.partial(func, self, *args, **kwargs)
+        print("launching subprocess:", process_name)
+        self._subprocess = mp_context.Process(target=target, name=process_name)
+        self._subprocess.run()
+
+        # TODO connect signal handlers
+
+        # Run event loop. This event loop should exit once the
+        # subprocessed work has completed
+        print("Starting loop...")
+        while not self._subprocess.exitcode:
+            self._loop()
+        print("Stopping loop...")
+
+        # Return result of subprocessed function
+
+    return _subprocessed
+
+
 # Stream()
 #
 # This is the main, toplevel calling interface in BuildStream core.
@@ -77,6 +113,7 @@ class Stream():
         #
         # Private members
         #
+        self._subprocess = None
         self._notification_queue = mp.Queue()
         self._context = context
         self._artifacts = None
@@ -245,6 +282,7 @@ class Stream():
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
+    @subprocessed
     def build(self, targets, *,
               track_targets=None,
               track_except=None,
@@ -1592,6 +1630,9 @@ class Stream():
                 else:
                     unique_id = None
                 self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+        elif notification.notification_type == NotificationType.EXCEPTION:
+            # TODO
+            pass
         else:
             raise StreamError("Unreccognised notification type recieved")
 
@@ -1610,31 +1651,19 @@ class Stream():
         #
         raise TypeError("Stream objects should not be pickled.")
 
-    # TODO
-    # Causes the decorated method to be run in a subprocess
-    @contextmanager
-    def subprocessed(self, func, *args, **kwargs):
-        pass
-        # Set up event loop
-
-        # Start subprocessed work
-
-        # Run event loop. This event loop should exit once the
-        # subprocessed work has completed
-
-        # Return result of subprocessed function
-
     # The code to be run by the Stream's event loop while delegating
-    # work to a subprocess with the @subprocessed
+    # work to a subprocess with the @subprocessed decorator
     def _loop(self):
         assert self._notification_queue
-        # Check that the subprocessed work has not finished
-        # TODO
 
         # Check for new messages
-        notification = self._notification_queue.get(block=True, timeout=0.1)
+        try:
+            notification = self._notification_queue.get(block=True, timeout=0.1)
+        except queue.Empty:
+            notification = None
+            print("queue empty, continuing...")
 
         # Process new messages
         if notification:
+            print("handling notifications")
             self._scheduler_notification_handler(notification)
-