You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:21:57 UTC
[buildstream] 07/33: WIP: spawn: use multiprocessing.Manager
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/picklable_jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 82efed60fcb3d4f9227c3de3bf1649aa8d78a4ab
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Apr 2 13:29:49 2019 +0100
WIP: spawn: use multiprocessing.Manager
Note that the queue changes went into 4d0a2d60c6ac01695b39871002d990646c6fc327
for pickling Job, ElementJob, etc.
---
src/buildstream/_scheduler/jobs/job.py | 21 ++++++++++++---------
src/buildstream/_scheduler/scheduler.py | 4 ++++
2 files changed, 16 insertions(+), 9 deletions(-)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 6b6f45b..c15ce04 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -121,7 +121,7 @@ class Job():
#
def spawn(self):
- self._queue = multiprocessing.Queue()
+ self._queue = self._scheduler.manager.Queue()
self._tries += 1
self._parent_start_listening()
@@ -490,19 +490,22 @@ class Job():
#
# http://bugs.python.org/issue3831
#
- if not self._listening:
- self._scheduler.loop.add_reader(
- self._queue._reader.fileno(), self._parent_recv)
- self._listening = True
+
+ # if not self._listening:
+ # self._scheduler.loop.add_reader(
+ # self._queue._reader.fileno(), self._parent_recv)
+ # self._listening = True
+ pass
# _parent_stop_listening()
#
# Stops listening on the message queue
#
def _parent_stop_listening(self):
- if self._listening:
- self._scheduler.loop.remove_reader(self._queue._reader.fileno())
- self._listening = False
+ # if self._listening:
+ # self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ # self._listening = False
+ pass
class ChildJob():
@@ -732,7 +735,7 @@ class ChildJob():
# exit_code (int): The exit code to exit with
#
def _child_shutdown(self, exit_code):
- self._queue.close()
+ # self._queue.close()
sys.exit(exit_code)
# _child_message_handler()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 50ad7f0..12acdd5 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -75,6 +75,10 @@ class Scheduler():
job_start_callback=None,
job_complete_callback=None):
+ import multiprocessing
+ multiprocessing.set_start_method('spawn')
+ self.manager = multiprocessing.Manager()
+
#
# Public members
#