You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:21:57 UTC

[buildstream] 07/33: WIP: spawn: use multiprocessing.Manager

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/picklable_jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 82efed60fcb3d4f9227c3de3bf1649aa8d78a4ab
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Apr 2 13:29:49 2019 +0100

    WIP: spawn: use multiprocessing.Manager
    
    Note that the queue changes went into 4d0a2d60c6ac01695b39871002d990646c6fc327
    for pickling Job, ElementJob, etc.
---
 src/buildstream/_scheduler/jobs/job.py  | 21 ++++++++++++---------
 src/buildstream/_scheduler/scheduler.py |  4 ++++
 2 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 6b6f45b..c15ce04 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -121,7 +121,7 @@ class Job():
     #
     def spawn(self):
 
-        self._queue = multiprocessing.Queue()
+        self._queue = self._scheduler.manager.Queue()
 
         self._tries += 1
         self._parent_start_listening()
@@ -490,19 +490,22 @@ class Job():
         #
         #      http://bugs.python.org/issue3831
         #
-        if not self._listening:
-            self._scheduler.loop.add_reader(
-                self._queue._reader.fileno(), self._parent_recv)
-            self._listening = True
+
+        # if not self._listening:
+        #     self._scheduler.loop.add_reader(
+        #         self._queue._reader.fileno(), self._parent_recv)
+        #     self._listening = True
+        pass
 
     # _parent_stop_listening()
     #
     # Stops listening on the message queue
     #
     def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._queue._reader.fileno())
-            self._listening = False
+        # if self._listening:
+        #     self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+        #     self._listening = False
+        pass
 
 
 class ChildJob():
@@ -732,7 +735,7 @@ class ChildJob():
     #    exit_code (int): The exit code to exit with
     #
     def _child_shutdown(self, exit_code):
-        self._queue.close()
+        # self._queue.close()
         sys.exit(exit_code)
 
     # _child_message_handler()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 50ad7f0..12acdd5 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -75,6 +75,10 @@ class Scheduler():
                  job_start_callback=None,
                  job_complete_callback=None):
 
+        import multiprocessing
+        multiprocessing.set_start_method('spawn')
+        self.manager = multiprocessing.Manager()
+
         #
         # Public members
         #