You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:30:16 UTC

[buildstream] 08/21: WIP: spawn instead of fork

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch aevri/win32
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 3940d1193179e0416b5e61f74bac4d4489c0d846
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Jun 18 15:51:13 2019 +0100

    WIP: spawn instead of fork
---
 src/buildstream/_context.py                     | 12 +++---
 src/buildstream/_scheduler/jobs/job.py          | 49 ++++++++++---------------
 src/buildstream/_scheduler/queues/buildqueue.py |  5 ++-
 src/buildstream/_scheduler/scheduler.py         |  4 ++
 4 files changed, 33 insertions(+), 37 deletions(-)

diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index d9ef1f1..2e1d0ee 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -584,7 +584,9 @@ class Context():
         # we also do not allow it in the main process.
         assert self._log_handle is None
         assert self._log_filename is None
-        assert not utils._is_main_process()
+
+        # Need to deal with global _main_pid var.
+        # assert not utils._is_main_process()
 
         # Create the fully qualified logfile in the log directory,
         # appending the pid and .log extension at the end.
@@ -679,10 +681,10 @@ class Context():
         # If this message is associated with a plugin, print what
         # we know about the plugin.
         plugin_name = ""
-        if message.unique_id:
-            template += " {plugin}"
-            plugin = Plugin._lookup(message.unique_id)
-            plugin_name = plugin.name
+        # if message.unique_id:
+        #     template += " {plugin}"
+        #     plugin = Plugin._lookup(message.unique_id)
+        #     plugin_name = plugin.name
 
         template += ": {message}"
 
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 440a85b..ebbbfa6 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -244,7 +244,7 @@ class Job():
     #
     def start(self):
 
-        self._queue = multiprocessing.Queue()
+        self._queue = self._scheduler.manager.Queue()
 
         self._tries += 1
         self._parent_start_listening()
@@ -259,32 +259,18 @@ class Job():
             self._task_id,
         )
 
-        self._process = Process(target=child_job.child_action, args=[self._queue])
-
-        import contextlib
-        import time
-        @contextlib.contextmanager
-        def timer(message):
-            then = time.time()
-            yield
-            now = time.time()
-            print(f"({now - then:,.2}s):", message)
-
-        import buildstream.testpickle
-        with timer(f"Pickle {self._child_action}"):
-            pickled_process = buildstream.testpickle.test_pickle_direct(self._child_action)
-        print(f"Size of pickled data: {len(pickled_process.getbuffer()):,}")
-        import pickle
-        pickled_process.seek(0)
-        # unpickled_process = pickle.load(pickled_process)
+        pickled = _pickle_child_job(child_job, self._scheduler.context)
+        self._process = Process(
+            target=_do_pickled_child_job,
+            args=[pickled, self._queue],
+        )
 
         # Block signals which are handled in the main process such that
         # the child process does not inherit the parent's state, but the main
         # process will be notified of any signal after we launch the child.
         #
-        with timer(f"process.start {self}"):
-            with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
-                self._process.start()
+        with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+            self._process.start()
 
         # Wait for the child task to complete.
         #
@@ -629,19 +615,22 @@ class Job():
         #
         #      http://bugs.python.org/issue3831
         #
-        if not self._listening:
-            self._scheduler.loop.add_reader(
-                self._queue._reader.fileno(), self._parent_recv)
-            self._listening = True
+
+        # if not self._listening:
+        #     self._scheduler.loop.add_reader(
+        #         self._queue._reader.fileno(), self._parent_recv)
+        #     self._listening = True
+        pass
 
     # _parent_stop_listening()
     #
     # Stops listening on the message queue
     #
     def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._queue._reader.fileno())
-            self._listening = False
+        # if self._listening:
+        #     self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+        #     self._listening = False
+        pass
 
 
 # ChildJob()
@@ -922,7 +911,7 @@ class ChildJob():
     #    exit_code (_ReturnCode): The exit code to exit with
     #
     def _child_shutdown(self, exit_code):
-        self._queue.close()
+        # self._queue.close()
         assert isinstance(exit_code, _ReturnCode)
         sys.exit(int(exit_code))
 
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index b280661..ff65158 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -108,8 +108,9 @@ class BuildQueue(Queue):
         #        artifact cache size for a successful build even though we know a
         #        failed build also grows the artifact cache size.
         #
-        if status is JobStatus.OK:
-            self._check_cache_size(job, element, result)
+
+        # if status is JobStatus.OK:
+        #     self._check_cache_size(job, element, result)
 
     def register_pending_element(self, element):
         # Set a "buildable" callback for an element not yet ready
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0ee6293..8a14391 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -75,6 +75,10 @@ class Scheduler():
                  job_start_callback=None,
                  job_complete_callback=None):
 
+        import multiprocessing
+        multiprocessing.set_start_method('spawn')
+        self.manager = multiprocessing.Manager()
+
         #
         # Public members
         #