You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:05 UTC
[buildstream] 11/19: WIP: Add a way to run stream methods in a
subprocess
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit d12e62d7aeb8595133b0c732a4b0038ae2479a24
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jul 4 09:55:51 2019 +0100
WIP: Add a way to run stream methods in a subprocess
Currently the frontend is getting stuck in an infite loop.
Also contains debugging print statements which will need to be
stripped out.
---
src/buildstream/_frontend/app.py | 153 ++++++++++++++++----------------
src/buildstream/_scheduler/scheduler.py | 6 +-
src/buildstream/_stream.py | 67 ++++++++++----
3 files changed, 129 insertions(+), 97 deletions(-)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 9f90938..90070af 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -596,84 +596,83 @@ class App():
def _handle_failure(self, element, queue, failure):
- # Handle non interactive mode setting of what to do when a job fails.
- if not self._interactive_failures:
+ # # Handle non interactive mode setting of what to do when a job fails.
+ # if not self._interactive_failures:
- if self.context.sched_error_action == _SchedulerErrorAction.TERMINATE:
- self.stream.terminate()
- elif self.context.sched_error_action == _SchedulerErrorAction.QUIT:
- self.stream.quit()
- elif self.context.sched_error_action == _SchedulerErrorAction.CONTINUE:
- pass
- return
-
- assert False
- # Interactive mode for element failures
- with self._interrupted():
-
- summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
- "\n" +
- "Choose one of the following options:\n" +
- " (c)ontinue - Continue queueing jobs as much as possible\n" +
- " (q)uit - Exit after all ongoing jobs complete\n" +
- " (t)erminate - Terminate any ongoing jobs and exit\n" +
- " (r)etry - Retry this job\n")
- if failure.logfile:
- summary += " (l)og - View the full log file\n"
- if failure.sandbox:
- summary += " (s)hell - Drop into a shell in the failed build sandbox\n"
- summary += "\nPressing ^C will terminate jobs and exit\n"
-
- choices = ['continue', 'quit', 'terminate', 'retry']
- if failure.logfile:
- choices += ['log']
- if failure.sandbox:
- choices += ['shell']
-
- choice = ''
- while choice not in ['continue', 'quit', 'terminate', 'retry']:
- click.echo(summary, err=True)
-
- self._notify("BuildStream failure", "{} on element {}"
- .format(failure.action_name, element.name))
-
- try:
- choice = click.prompt("Choice:", default='continue', err=True,
- value_proc=_prefix_choice_value_proc(choices))
- except click.Abort:
- # Ensure a newline after automatically printed '^C'
- click.echo("", err=True)
- choice = 'terminate'
-
- # Handle choices which you can come back from
- #
- assert choice != 'shell' # This won't work for now
- if choice == 'shell':
- click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
- try:
- prompt = self.shell_prompt(element)
- self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
- except BstError as e:
- click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
- elif choice == 'log':
- with open(failure.logfile, 'r') as logfile:
- content = logfile.read()
- click.echo_via_pager(content)
-
- if choice == 'terminate':
- click.echo("\nTerminating all jobs\n", err=True)
- self.stream.terminate()
- else:
- if choice == 'quit':
- click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.stream.quit()
- elif choice == 'continue':
- click.echo("\nContinuing with other non failing elements\n", err=True)
- elif choice == 'retry':
- click.echo("\nRetrying failed job\n", err=True)
- # FIXME: Outstandingly nasty modification of core state
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
+ if self.context.sched_error_action == 'terminate':
+ self.stream.terminate()
+ elif self.context.sched_error_action == 'quit':
+ self.stream.quit()
+ elif self.context.sched_error_action == 'continue':
+ pass
+ return
+
+ # assert False
+ # # Interactive mode for element failures
+ # with self._interrupted():
+
+ # summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
+ # "\n" +
+ # "Choose one of the following options:\n" +
+ # " (c)ontinue - Continue queueing jobs as much as possible\n" +
+ # " (q)uit - Exit after all ongoing jobs complete\n" +
+ # " (t)erminate - Terminate any ongoing jobs and exit\n" +
+ # " (r)etry - Retry this job\n")
+ # if failure.logfile:
+ # summary += " (l)og - View the full log file\n"
+ # if failure.sandbox:
+ # summary += " (s)hell - Drop into a shell in the failed build sandbox\n"
+ # summary += "\nPressing ^C will terminate jobs and exit\n"
+
+ # choices = ['continue', 'quit', 'terminate', 'retry']
+ # if failure.logfile:
+ # choices += ['log']
+ # if failure.sandbox:
+ # choices += ['shell']
+
+ # choice = ''
+ # while choice not in ['continue', 'quit', 'terminate', 'retry']:
+ # click.echo(summary, err=True)
+
+ # self._notify("BuildStream failure", "{} on element {}"
+ # .format(failure.action_name, element.name))
+
+ # try:
+ # choice = click.prompt("Choice:", default='continue', err=True,
+ # value_proc=_prefix_choice_value_proc(choices))
+ # except click.Abort:
+ # # Ensure a newline after automatically printed '^C'
+ # click.echo("", err=True)
+ # choice = 'terminate'
+
+ # # Handle choices which you can come back from
+ # #
+ # assert choice != 'shell' # This won't work for now
+ # if choice == 'shell':
+ # click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
+ # try:
+ # prompt = self.shell_prompt(element)
+ # self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
+ # except BstError as e:
+ # click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
+ # elif choice == 'log':
+ # with open(failure.logfile, 'r') as logfile:
+ # content = logfile.read()
+ # click.echo_via_pager(content)
+
+ # if choice == 'terminate':
+ # click.echo("\nTerminating all jobs\n", err=True)
+ # self.stream.terminate()
+ # else:
+ # if choice == 'quit':
+ # click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
+ # self.stream.quit()
+ # elif choice == 'continue':
+ # click.echo("\nContinuing with other non failing elements\n", err=True)
+ # elif choice == 'retry':
+ # click.echo("\nRetrying failed job\n", err=True)
+ # queue.failed_elements.remove(element)
+ # queue.enqueue([element])
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 14ecf30..2e9c740 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -52,6 +52,7 @@ class NotificationType(enum.Enum):
JOB_START = "job_start"
JOB_COMPLETE = "job_complete"
TICK = "tick"
+ EXCEPTION = "exception"
class Notification:
@@ -64,7 +65,9 @@ class Notification:
job_status=None,
failed_element=False,
elapsed_time=None,
- element=None):
+ element=None,
+ exception=None):
+
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
@@ -72,6 +75,7 @@ class Notification:
self.failed_element = failed_element
self.elapsed_time = elapsed_time
self.element = element
+ self.exception = exception
# Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e8b3931..d2a4d08 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,7 @@
# Jürg Billeter <ju...@codethink.co.uk>
# Tristan Maat <tr...@codethink.co.uk>
+import asyncio
import itertools
import functools
import multiprocessing as mp
@@ -31,6 +32,7 @@ import tarfile
import tempfile
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -46,6 +48,40 @@ from . import utils, _yaml, _site
from . import Scope, Consistency
+# A decorator which runs the decorated method to be run in a subprocess
+def subprocessed(func):
+
+ @functools.wraps(func)
+ def _subprocessed(self, *args, **kwargs):
+ assert self
+ print("Args: {}".format([*args]))
+ print("Kwargs: {}".format(list(kwargs.items())))
+ assert not self._subprocess
+
+ # TODO use functools to pass arguments to func to make target for subprocess
+
+ # Start subprocessed work
+ mp_context = mp.get_context(method='spawn')
+ process_name = "stream-{}".format(func.__name__)
+ target = functools.partial(func, self, *args, **kwargs)
+ print("launching subprocess:", process_name)
+ self._subprocess = mp_context.Process(target=target, name=process_name)
+ self._subprocess.run()
+
+ # TODO connect signal handlers
+
+ # Run event loop. This event loop should exit once the
+ # subprocessed work has completed
+ print("Starting loop...")
+ while not self._subprocess.exitcode:
+ self._loop()
+ print("Stopping loop...")
+
+ # Return result of subprocessed function
+
+ return _subprocessed
+
+
# Stream()
#
# This is the main, toplevel calling interface in BuildStream core.
@@ -77,6 +113,7 @@ class Stream():
#
# Private members
#
+ self._subprocess = None
self._notification_queue = mp.Queue()
self._context = context
self._artifacts = None
@@ -245,6 +282,7 @@ class Stream():
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
+ @subprocessed
def build(self, targets, *,
track_targets=None,
track_except=None,
@@ -1592,6 +1630,9 @@ class Stream():
else:
unique_id = None
self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ elif notification.notification_type == NotificationType.EXCEPTION:
+ # TODO
+ pass
else:
raise StreamError("Unreccognised notification type recieved")
@@ -1610,31 +1651,19 @@ class Stream():
#
raise TypeError("Stream objects should not be pickled.")
- # TODO
- # Causes the decorated method to be run in a subprocess
- @contextmanager
- def subprocessed(self, func, *args, **kwargs):
- pass
- # Set up event loop
-
- # Start subprocessed work
-
- # Run event loop. This event loop should exit once the
- # subprocessed work has completed
-
- # Return result of subprocessed function
-
# The code to be run by the Stream's event loop while delegating
- # work to a subprocess with the @subprocessed
+ # work to a subprocess with the @subprocessed decorator
def _loop(self):
assert self._notification_queue
- # Check that the subprocessed work has not finished
- # TODO
# Check for new messages
- notification = self._notification_queue.get(block=True, timeout=0.1)
+ try:
+ notification = self._notification_queue.get(block=True, timeout=0.1)
+ except queue.Empty:
+ notification = None
+ print("queue empty, continuing...")
# Process new messages
if notification:
+ print("handling notifications")
self._scheduler_notification_handler(notification)
-