You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:49:45 UTC

[buildstream] 11/12: pipe hack

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 9068295c9a73197a5e49b9b83a8648850c0961af
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Dec 15 07:44:12 2020 +0100

    pipe hack
---
 src/buildstream/_scheduler/jobs/job.py | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c7e2624..7579d24 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -145,10 +145,15 @@ class Job:
         assert not self._terminated, "Attempted to start process which was already terminated"
 
         # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
+        silence = self.action_name == "Cache-query"
+        if silence:
+            self._pipe_r = pipe_w = None
+        else:
+            self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
 
         self._tries += 1
-        self._parent_start_listening()
+        if not silence:
+            self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -350,7 +355,8 @@ class Job:
         self._scheduler.job_completed(self, status)
 
         # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
+        if self._pipe_r:
+            self._pipe_r.close()
         self._pipe_r = self._task = None
 
     # _parent_process_pipe()
@@ -359,6 +365,8 @@ class Job:
     # in the parent process.
     #
     def _parent_process_pipe(self):
+        if not self._pipe_r:
+            return
         while self._pipe_r.poll():
             try:
                 message = self._pipe_r.recv()
@@ -581,7 +589,8 @@ class ChildJob:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
             finally:
-                self._pipe_w.close()
+                if self._pipe_w:
+                    self._pipe_w.close()
 
     # terminate()
     #
@@ -620,6 +629,8 @@ class ChildJob:
     #    is_silenced (bool)   : Whether messages are silenced
     #
     def _child_message_handler(self, message, is_silenced):
+        if self.action_name == "Cache-query":
+            return
 
         message.action_name = self.action_name
         message.task_element_name = self._message_element_name