You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:08 UTC

[buildstream] 14/19: fixup! Move subprocess machinery into a method

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f39f4ef0e541f50911fb695e7848a2e6482f2011
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 14:21:30 2019 +0100

    fixup! Move subprocess machinery into a method
---
 src/buildstream/_stream.py | 94 +++++++++++-----------------------------------
 1 file changed, 22 insertions(+), 72 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e4f6bc0..1de8364 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -47,46 +47,6 @@ from . import utils, _yaml, _site
 from . import Scope, Consistency
 
 
-def _subprocessed(self, *args, **kwargs):
-    assert self
-    print("Args: {}".format([*args]))
-    print("Kwargs: {}".format(list(kwargs.items())))
-    assert not self._subprocess
-
-    global notification_count 
-    notification_count = 0
-    # TODO use functools to pass arguments to func to make target for subprocess
-
-    # Start subprocessed work
-    mp_context = mp.get_context(method='spawn')
-    process_name = "stream-{}".format(func.__name__)
-    print("launchinglaunching subprocess:", process_name)
-    print(func.__module__, func.__name__)
-    import buildstream
-    try:
-        assert func is buildstream._stream.Stream.build or func is Stream.build
-    except AssertionError:
-        print(func, func.__qualname__, func.__name__, func.__module__, id(func))
-    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-    self._subprocess.start()
-
-    # TODO connect signal handlers
-
-    self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
-    print("Starting loop...")
-    while not self._subprocess.exitcode:
-        self._loop()
-    print("Stopping loop...")
-
-    try:
-        while True:
-            notification = self.notification_queue.get()
-            self._scheduler_notification_handler(notification)
-    except queue.Empty:
-        pass
-
-
 # Stream()
 #
 # This is the main, toplevel calling interface in BuildStream core.
@@ -146,15 +106,15 @@ class Stream():
     def init(self):
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
-        print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
-
+        print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__,
+              id(Stream.build))
 
     def run_in_subprocess(self, func, *args, **kwargs):
         print("Args: {}".format([*args]))
         print("Kwargs: {}".format(list(kwargs.items())))
         assert not self._subprocess
 
-        global notification_count 
+        global notification_count
         notification_count = 0
         # TODO use functools to pass arguments to func to make target for subprocess
 
@@ -162,30 +122,22 @@ class Stream():
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
         print("launchinglaunching subprocess:", process_name)
-        print(func.__module__, func.__name__)
-        import buildstream
-        try:
-            assert func is buildstream._stream.Stream.build or func is Stream.build
-        except AssertionError:
-            print(func, func.__qualname__, func.__name__, func.__module__, id(func))
         self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
         self._subprocess.start()
 
         # TODO connect signal handlers
-
-        self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
-
-        print("Starting loop...")
-        while not self._subprocess.exitcode:
+        while self._subprocess.exitcode is not None:
+            self._subprocess.join(0.1)
             self._loop()
         print("Stopping loop...")
 
-        try:
-            while True:
-                notification = self.notification_queue.get()
-                self._scheduler_notification_handler(notification)
-        except queue.Empty:
-            pass
+        # try:
+        #     while True:
+        #         notification = self._notification_queue.get_nowait()
+        #         self._scheduler_notification_handler(notification)
+        # except queue.Empty:
+        #     print("Finished processing notifications")
+        #     pass
 
     # cleanup()
     #
@@ -312,6 +264,7 @@ class Stream():
 
     def build(self, *args, **kwargs):
         self.run_in_subprocess(self._build, *args, **kwargs)
+
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -1701,15 +1654,12 @@ class Stream():
     # work to a subprocess with the @subprocessed decorator
     def _loop(self):
         assert self._notification_queue
-
-        # Check for new messages
-        try:
-            notification = self._notification_queue.get(block=True, timeout=0.1)
-        except queue.Empty:
-            notification = None
-            print("queue empty, continuing...")
-
-        # Process new messages
-        if notification:
-            print("handling notifications")
-            self._scheduler_notification_handler(notification)
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notification_queue.get_nowait()
+                print("handling notifications")
+                self._scheduler_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break