You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:08 UTC
[buildstream] 14/19: fixup! Move subprocess machinery into a method
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit f39f4ef0e541f50911fb695e7848a2e6482f2011
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 14:21:30 2019 +0100
fixup! Move subprocess machinery into a method
---
src/buildstream/_stream.py | 94 +++++++++++-----------------------------------
1 file changed, 22 insertions(+), 72 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e4f6bc0..1de8364 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -47,46 +47,6 @@ from . import utils, _yaml, _site
from . import Scope, Consistency
-def _subprocessed(self, *args, **kwargs):
- assert self
- print("Args: {}".format([*args]))
- print("Kwargs: {}".format(list(kwargs.items())))
- assert not self._subprocess
-
- global notification_count
- notification_count = 0
- # TODO use functools to pass arguments to func to make target for subprocess
-
- # Start subprocessed work
- mp_context = mp.get_context(method='spawn')
- process_name = "stream-{}".format(func.__name__)
- print("launchinglaunching subprocess:", process_name)
- print(func.__module__, func.__name__)
- import buildstream
- try:
- assert func is buildstream._stream.Stream.build or func is Stream.build
- except AssertionError:
- print(func, func.__qualname__, func.__name__, func.__module__, id(func))
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
- self._subprocess.start()
-
- # TODO connect signal handlers
-
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
- print("Starting loop...")
- while not self._subprocess.exitcode:
- self._loop()
- print("Stopping loop...")
-
- try:
- while True:
- notification = self.notification_queue.get()
- self._scheduler_notification_handler(notification)
- except queue.Empty:
- pass
-
-
# Stream()
#
# This is the main, toplevel calling interface in BuildStream core.
@@ -146,15 +106,15 @@ class Stream():
def init(self):
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
- print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
-
+ print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__,
+ id(Stream.build))
def run_in_subprocess(self, func, *args, **kwargs):
print("Args: {}".format([*args]))
print("Kwargs: {}".format(list(kwargs.items())))
assert not self._subprocess
- global notification_count
+ global notification_count
notification_count = 0
# TODO use functools to pass arguments to func to make target for subprocess
@@ -162,30 +122,22 @@ class Stream():
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
print("launchinglaunching subprocess:", process_name)
- print(func.__module__, func.__name__)
- import buildstream
- try:
- assert func is buildstream._stream.Stream.build or func is Stream.build
- except AssertionError:
- print(func, func.__qualname__, func.__name__, func.__module__, id(func))
self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
self._subprocess.start()
# TODO connect signal handlers
-
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
- print("Starting loop...")
- while not self._subprocess.exitcode:
+ while self._subprocess.exitcode is not None:
+ self._subprocess.join(0.1)
self._loop()
print("Stopping loop...")
- try:
- while True:
- notification = self.notification_queue.get()
- self._scheduler_notification_handler(notification)
- except queue.Empty:
- pass
+ # try:
+ # while True:
+ # notification = self._notification_queue.get_nowait()
+ # self._scheduler_notification_handler(notification)
+ # except queue.Empty:
+ # print("Finished processing notifications")
+ # pass
# cleanup()
#
@@ -312,6 +264,7 @@ class Stream():
def build(self, *args, **kwargs):
self.run_in_subprocess(self._build, *args, **kwargs)
+
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -1701,15 +1654,12 @@ class Stream():
# work to a subprocess with the @subprocessed decorator
def _loop(self):
assert self._notification_queue
-
- # Check for new messages
- try:
- notification = self._notification_queue.get(block=True, timeout=0.1)
- except queue.Empty:
- notification = None
- print("queue empty, continuing...")
-
- # Process new messages
- if notification:
- print("handling notifications")
- self._scheduler_notification_handler(notification)
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notification_queue.get_nowait()
+ print("handling notifications")
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ notification = None
+ break