You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:07 UTC

[buildstream] 13/19: Move subprocess machinery into a method

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8b32793fa8ce95273ee034829166e0697363d00e
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 11:26:04 2019 +0100

    Move subprocess machinery into a method
    
    This get's round the pickling identity issues
---
 src/buildstream/_stream.py | 128 ++++++++++++++++++++++++++++++---------------
 1 file changed, 87 insertions(+), 41 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0fe2234..e4f6bc0 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,7 +19,6 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
-import asyncio
 import itertools
 import functools
 import multiprocessing as mp
@@ -48,38 +47,44 @@ from . import utils, _yaml, _site
 from . import Scope, Consistency
 
 
-# A decorator which runs the decorated method to be run in a subprocess
-def subprocessed(func):
-
-    @functools.wraps(func)
-    def _subprocessed(self, *args, **kwargs):
-        assert self
-        print("Args: {}".format([*args]))
-        print("Kwargs: {}".format(list(kwargs.items())))
-        assert not self._subprocess
-
-        # TODO use functools to pass arguments to func to make target for subprocess
-
-        # Start subprocessed work
-        mp_context = mp.get_context(method='spawn')
-        process_name = "stream-{}".format(func.__name__)
-        target = functools.partial(func, self, *args, **kwargs)
-        print("launching subprocess:", process_name)
-        self._subprocess = mp_context.Process(target=target, name=process_name)
-        self._subprocess.start()
-
-        # TODO connect signal handlers
-
-        # Run event loop. This event loop should exit once the
-        # subprocessed work has completed
-        print("Starting loop...")
-        while not self._subprocess.exitcode:
-            self._loop()
-        print("Stopping loop...")
-
-        # Return result of subprocessed function
-
-    return _subprocessed
+def _subprocessed(self, *args, **kwargs):
+    assert self
+    print("Args: {}".format([*args]))
+    print("Kwargs: {}".format(list(kwargs.items())))
+    assert not self._subprocess
+
+    global notification_count 
+    notification_count = 0
+    # TODO use functools to pass arguments to func to make target for subprocess
+
+    # Start subprocessed work
+    mp_context = mp.get_context(method='spawn')
+    process_name = "stream-{}".format(func.__name__)
+    print("launchinglaunching subprocess:", process_name)
+    print(func.__module__, func.__name__)
+    import buildstream
+    try:
+        assert func is buildstream._stream.Stream.build or func is Stream.build
+    except AssertionError:
+        print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+    self._subprocess.start()
+
+    # TODO connect signal handlers
+
+    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+    print("Starting loop...")
+    while not self._subprocess.exitcode:
+        self._loop()
+    print("Stopping loop...")
+
+    try:
+        while True:
+            notification = self.notification_queue.get()
+            self._scheduler_notification_handler(notification)
+    except queue.Empty:
+        pass
 
 
 # Stream()
@@ -141,6 +146,46 @@ class Stream():
     def init(self):
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
+        print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
+
+
+    def run_in_subprocess(self, func, *args, **kwargs):
+        print("Args: {}".format([*args]))
+        print("Kwargs: {}".format(list(kwargs.items())))
+        assert not self._subprocess
+
+        global notification_count 
+        notification_count = 0
+        # TODO use functools to pass arguments to func to make target for subprocess
+
+        # Start subprocessed work
+        mp_context = mp.get_context(method='fork')
+        process_name = "stream-{}".format(func.__name__)
+        print("launchinglaunching subprocess:", process_name)
+        print(func.__module__, func.__name__)
+        import buildstream
+        try:
+            assert func is buildstream._stream.Stream.build or func is Stream.build
+        except AssertionError:
+            print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+        self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+        self._subprocess.start()
+
+        # TODO connect signal handlers
+
+        self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+        print("Starting loop...")
+        while not self._subprocess.exitcode:
+            self._loop()
+        print("Stopping loop...")
+
+        try:
+            while True:
+                notification = self.notification_queue.get()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            pass
 
     # cleanup()
     #
@@ -265,6 +310,8 @@ class Stream():
         return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
                               usebuildtree=buildtree)
 
+    def build(self, *args, **kwargs):
+        self.run_in_subprocess(self._build, *args, **kwargs)
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -282,14 +329,13 @@ class Stream():
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
-    @subprocessed
-    def build(self, targets, *,
-              track_targets=None,
-              track_except=None,
-              track_cross_junctions=False,
-              ignore_junction_targets=False,
-              build_all=False,
-              remote=None):
+    def _build(self, targets, *,
+               track_targets=None,
+               track_except=None,
+               track_cross_junctions=False,
+               ignore_junction_targets=False,
+               build_all=False,
+               remote=None):
 
         if build_all:
             selection = PipelineSelection.ALL