You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:18:24 UTC

[buildstream] 12/13: plugin.py: Add a helper to run blocking processes in subprocesses

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d211abc07f5e56ad1f9e4a6a6b8804e67380e4fe
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Thu Jul 9 19:02:20 2020 +0100

    plugin.py: Add a helper to run blocking processes in subprocesses
    
    This ensures that we can cleanly cleanup processes and threads on
    termination of BuildStream.
    
    Plugins should use this helper whenever there is a risk of them being
    blocked on a syscall for an indefinite amount of time
---
 src/buildstream/downloadablefilesource.py | 97 +++++++++++++++++--------------
 src/buildstream/plugin.py                 | 28 ++++++++-
 2 files changed, 80 insertions(+), 45 deletions(-)

diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index b9ca919..4875445 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -99,6 +99,34 @@ class _NetrcPasswordManager:
             return login, password
 
 
+def _download_file(opener, url, etag, directory):
+    default_name = os.path.basename(url)
+    request = urllib.request.Request(url)
+    request.add_header("Accept", "*/*")
+    request.add_header("User-Agent", "BuildStream/2")
+
+    if etag is not None:
+        request.add_header("If-None-Match", etag)
+
+    with contextlib.closing(opener.open(request)) as response:
+        info = response.info()
+
+        # some servers don't honor the 'If-None-Match' header
+        if etag and info["ETag"] == etag:
+            return None, None
+
+        etag = info["ETag"]
+
+        filename = info.get_filename(default_name)
+        filename = os.path.basename(filename)
+        local_file = os.path.join(directory, filename)
+        with open(local_file, "wb") as dest:
+            shutil.copyfileobj(response, dest)
+
+    return local_file, etag
+
+
+
 class DownloadableFileSource(Source):
     # pylint: disable=attribute-defined-outside-init
 
@@ -137,19 +165,18 @@ class DownloadableFileSource(Source):
         # there is no 'track' field in the source to determine what/whether
         # or not to update refs, because tracking a ref is always a conscious
         # decision by the user.
-        with self.timed_activity("Tracking {}".format(self.url), silent_nested=True):
-            new_ref = self._ensure_mirror()
+        new_ref = self._ensure_mirror("Tracking {}".format(self.url))
 
-            if self.ref and self.ref != new_ref:
-                detail = (
-                    "When tracking, new ref differs from current ref:\n"
-                    + "  Tracked URL: {}\n".format(self.url)
-                    + "  Current ref: {}\n".format(self.ref)
-                    + "  New ref: {}\n".format(new_ref)
-                )
-                self.warn("Potential man-in-the-middle attack!", detail=detail)
+        if self.ref and self.ref != new_ref:
+            detail = (
+                "When tracking, new ref differs from current ref:\n"
+                + "  Tracked URL: {}\n".format(self.url)
+                + "  Current ref: {}\n".format(self.ref)
+                + "  New ref: {}\n".format(new_ref)
+            )
+            self.warn("Potential man-in-the-middle attack!", detail=detail)
 
-            return new_ref
+        return new_ref
 
     def fetch(self):  # pylint: disable=arguments-differ
 
@@ -162,12 +189,11 @@ class DownloadableFileSource(Source):
 
         # Download the file, raise hell if the sha256sums don't match,
         # and mirror the file otherwise.
-        with self.timed_activity("Fetching {}".format(self.url), silent_nested=True):
-            sha256 = self._ensure_mirror()
-            if sha256 != self.ref:
-                raise SourceError(
-                    "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
-                )
+        sha256 = self._ensure_mirror("Fetching {}".format(self.url),)
+        if sha256 != self.ref:
+            raise SourceError(
+                "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
+            )
 
     def _warn_deprecated_etag(self, node):
         etag = node.get_str("etag", None)
@@ -188,40 +214,23 @@ class DownloadableFileSource(Source):
         with utils.save_file_atomic(etagfilename) as etagfile:
             etagfile.write(etag)
 
-    def _ensure_mirror(self):
+    def _ensure_mirror(self, activity_name: str):
         # Downloads from the url and caches it according to its sha256sum.
         try:
             with self.tempdir() as td:
-                default_name = os.path.basename(self.url)
-                request = urllib.request.Request(self.url)
-                request.add_header("Accept", "*/*")
-                request.add_header("User-Agent", "BuildStream/2")
-
                 # We do not use etag in case what we have in cache is
                 # not matching ref in order to be able to recover from
                 # corrupted download.
-                if self.ref:
-                    etag = self._get_etag(self.ref)
-
+                if self.ref and not self.is_cached():
                     # Do not re-download the file if the ETag matches.
-                    if etag and self.is_cached():
-                        request.add_header("If-None-Match", etag)
-
-                opener = self.__get_urlopener()
-                with contextlib.closing(opener.open(request)) as response:
-                    info = response.info()
-
-                    # some servers don't honor the 'If-None-Match' header
-                    if self.ref and etag and info["ETag"] == etag:
-                        return self.ref
+                    etag = self._get_etag(self.ref)
+                else:
+                    etag = None
 
-                    etag = info["ETag"]
+                local_file, new_etag = self.blocking_activity(_download_file, (self.__get_urlopener(), self.url, etag, td), activity_name)
 
-                    filename = info.get_filename(default_name)
-                    filename = os.path.basename(filename)
-                    local_file = os.path.join(td, filename)
-                    with open(local_file, "wb") as dest:
-                        shutil.copyfileobj(response, dest)
+                if local_file is None:
+                    return self.ref
 
                 # Make sure url-specific mirror dir exists.
                 if not os.path.isdir(self._mirror_dir):
@@ -233,8 +242,8 @@ class DownloadableFileSource(Source):
                 # In case the old file was corrupted somehow.
                 os.rename(local_file, self._get_mirror_file(sha256))
 
-                if etag:
-                    self._store_etag(sha256, etag)
+                if new_etag:
+                    self._store_etag(sha256, new_etag)
                 return sha256
 
         except urllib.error.HTTPError as e:
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index deb105a..0ed6d7d 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -110,11 +110,12 @@ Class Reference
 """
 
 import itertools
+import multiprocessing
 import os
 import subprocess
 import sys
 from contextlib import contextmanager
-from typing import Generator, Optional, Tuple, TYPE_CHECKING
+from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING
 from weakref import WeakValueDictionary
 
 from . import utils
@@ -131,6 +132,14 @@ if TYPE_CHECKING:
     # pylint: enable=cyclic-import
 
 
+T1 = TypeVar("T1")
+T2 = TypeVar("T2")
+
+
+def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None:
+    queue.put(target(*args))
+
+
 class Plugin:
     """Plugin()
 
@@ -212,6 +221,8 @@ class Plugin:
     # scheduling tasks.
     __TABLE = WeakValueDictionary()  # type: WeakValueDictionary[int, Plugin]
 
+    __multiprocessing_context = multiprocessing.get_context("spawn")
+
     def __init__(
         self,
         name: str,
@@ -503,6 +514,21 @@ class Plugin:
         ):
             yield
 
+    def blocking_activity(self, target: Callable[[T1], T2], args: T1, activity_name: str, *, detail: Optional[str] = None, silent_nested: bool = False) -> T2:
+        with self.__context.messenger.timed_activity(
+            activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested
+        ):
+            queue = self.__multiprocessing_context.Queue()
+
+            proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
+            proc.start()
+
+            result = queue.get()
+            proc.join()
+
+            return result
+
+
     def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
         """A wrapper for subprocess.call()