You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:32:59 UTC

[buildstream] 09/22: WIP: spawn instead of fork

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch aevri/win32_minimal_seemstowork_20190829
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 28951e285af67b98f657e31529963ddecc6dc9cd
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Jul 9 13:46:41 2019 +0100

    WIP: spawn instead of fork
---
 src/buildstream/_messenger.py           | 12 +++++++-----
 src/buildstream/_scheduler/jobs/job.py  | 26 ++++++++++++++++----------
 src/buildstream/_scheduler/scheduler.py |  4 ++++
 3 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 7dec939..d74d391 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -179,7 +179,9 @@ class Messenger():
         # we also do not allow it in the main process.
         assert self._log_handle is None
         assert self._log_filename is None
-        assert not utils._is_main_process()
+
+        # Need to deal with global _main_pid var.
+        # assert not utils._is_main_process()
 
         # Create the fully qualified logfile in the log directory,
         # appending the pid and .log extension at the end.
@@ -255,10 +257,10 @@ class Messenger():
         # If this message is associated with a plugin, print what
         # we know about the plugin.
         plugin_name = ""
-        if message.unique_id:
-            template += " {plugin}"
-            plugin = Plugin._lookup(message.unique_id)
-            plugin_name = plugin.name
+        # if message.unique_id:
+        #     template += " {plugin}"
+        #     plugin = Plugin._lookup(message.unique_id)
+        #     plugin_name = plugin.name
 
         template += ": {message}"
 
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 47a8622..8365bd8 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -34,7 +34,7 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
 from ... import _signals, utils
 
-from .jobpickler import pickle_child_job
+from .jobpickler import pickle_child_job, unpickle_child_job
 
 
 # Return code values shutdown of job handling child processes
@@ -89,6 +89,11 @@ class _MessageType(enum.Enum):
     SUBCLASS_CUSTOM_MESSAGE = 5
 
 
+def _do_pickled_child_job(pickled, *child_args):
+    child_job = unpickle_child_job(pickled)
+    return child_job.child_action(*child_args)
+
+
 # Job()
 #
 # The Job object represents a task that will run in parallel to the main
@@ -165,7 +170,7 @@ class Job():
     #
     def start(self):
 
-        self._queue = multiprocessing.Queue()
+        self._queue = self._scheduler.manager.Queue()
 
         self._tries += 1
         self._parent_start_listening()
@@ -181,10 +186,11 @@ class Job():
             self._task_id,
         )
 
-        if 'BST_TEST_SUITE' in os.environ:
-            pickle_child_job(child_job, self._scheduler.context)
-
-        self._process = Process(target=child_job.child_action, args=[self._queue])
+        pickled = pickle_child_job(child_job, self._scheduler.context)
+        self._process = Process(
+            target=_do_pickled_child_job,
+            args=[pickled, self._queue],
+        )
 
         # Block signals which are handled in the main process such that
         # the child process does not inherit the parent's state, but the main
@@ -537,8 +543,8 @@ class Job():
         #      http://bugs.python.org/issue3831
         #
         if not self._listening:
-            self._scheduler.loop.add_reader(
-                self._queue._reader.fileno(), self._parent_recv)
+            # self._scheduler.loop.add_reader(
+            #     self._queue._reader.fileno(), self._parent_recv)
             self._listening = True
 
     # _parent_stop_listening()
@@ -547,7 +553,7 @@ class Job():
     #
     def _parent_stop_listening(self):
         if self._listening:
-            self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+            # self._scheduler.loop.remove_reader(self._queue._reader.fileno())
             self._listening = False
 
 
@@ -830,7 +836,7 @@ class ChildJob():
     #    exit_code (_ReturnCode): The exit code to exit with
     #
     def _child_shutdown(self, exit_code):
-        self._queue.close()
+        # self._queue.close()
         assert isinstance(exit_code, _ReturnCode)
         sys.exit(int(exit_code))
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 00d6114..ae20fcd 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -72,6 +72,10 @@ class Scheduler():
                  interrupt_callback=None,
                  ticker_callback=None):
 
+        import multiprocessing
+        multiprocessing.set_start_method('spawn')
+        self.manager = multiprocessing.Manager()
+
         #
         # Public members
         #