You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:02:33 UTC

[buildstream] branch aevri/win32_receive_signals created (now 739753b)

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a change to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 739753b  cas: localhost tcp/ip connection when on win32

This branch includes the following new commits:

     new 83a538e  testing/.../site: windows-friendly HAVE_OLD_GIT
     new 2d3eac0  testing/.../site: del unnecessary global 'version'
     new ca9dd39  _platform: add does_support_signals() accessor
     new 1270704  _signals.suspendable:"outermost" -> "is_outermost"
     new 23d1f13  _signals.terminator: "outermost" -> "is_outermost"
     new bfa2763  _signals.suspendable: early-out on win32
     new 6c99e0e  jobs/job: use named parameters for long list
     new 075cf2a  job: no redundant signal code on win32
     new df3f398  scheduler: no redundant signal code on win32
     new d80a5f5  _signals.blocked: early-out on win32
     new 9989422  cascache: extract CASDProcess in new module
     new 5c159ff  CASDProcessManager: 'release_resources' convention
     new e1c16f3  cascache: refactor, extract "connection_string"
     new 7b51e4a  casdprocessmanager: extract _UnixSocketConnection
     new 7d0bff1  CASDProcessManager: expect return code 1 on win32
     new 739753b  cas: localhost tcp/ip connection when on win32

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 14/16: casdprocessmanager: extract _UnixSocketConnection

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 7b51e4a2a3294adf62e37c5774a587282500b5a5
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 11 14:02:11 2019 +0100

    casdprocessmanager: extract _UnixSocketConnection
    
    Prepare for different kinds of connection to buildbox-casd by creating
    an abstraction over connection type.
---
 src/buildstream/_cas/casdprocessmanager.py | 41 +++++++++++++++++++++++++-----
 1 file changed, 34 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 2f28910..e83e128 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -27,10 +27,15 @@ import time
 
 from .. import _signals, utils
 from .._message import Message, MessageType
+from ..types import FastEnum
 
 _CASD_MAX_LOGFILES = 10
 
 
+class ConnectionType(FastEnum):
+    UNIX_SOCKET = 0
+
+
 # CASDProcessManager
 #
 # This manages the subprocess that runs buildbox-casd.
@@ -41,17 +46,23 @@ _CASD_MAX_LOGFILES = 10
 #     log_level (LogLevel): Log level to give to buildbox-casd for logging
 #     cache_quota (int): User configured cache quota
 #     protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#     connection_type (ConnectionType): How to connect to the cas daemon
 #
 class CASDProcessManager:
 
-    def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+    def __init__(
+            self,
+            path,
+            log_dir,
+            log_level,
+            cache_quota,
+            protect_session_blobs,
+            connection_type=ConnectionType.UNIX_SOCKET,
+    ):
         self._log_dir = log_dir
 
-        # Place socket in global/user temporary directory to avoid hitting
-        # the socket path length limit.
-        self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
-        socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
-        self.connection_string = "unix:" + socket_path
+        assert connection_type == ConnectionType.UNIX_SOCKET
+        self._connection = _UnixSocketConnection()
 
         casd_args = [utils.get_host_tool('buildbox-casd')]
         casd_args.append('--bind=' + self.connection_string)
@@ -79,6 +90,10 @@ class CASDProcessManager:
         self._failure_callback = None
         self._watcher = None
 
+    @property
+    def connection_string(self):
+        return self._connection.connection_string
+
     # _rotate_and_get_next_logfile()
     #
     # Get the logfile to use for casd
@@ -108,7 +123,7 @@ class CASDProcessManager:
     def release_resources(self, messenger=None):
         self._terminate(messenger)
         self._process = None
-        shutil.rmtree(self._socket_tempdir)
+        self._connection.release_resouces()
 
     # _terminate()
     #
@@ -216,3 +231,15 @@ class CASDProcessManager:
         assert self._failure_callback is not None
         self._process.returncode = returncode
         self._failure_callback()
+
+
+class _UnixSocketConnection:
+    def __init__(self):
+        # Place socket in global/user temporary directory to avoid hitting
+        # the socket path length limit.
+        self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+        socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+        self.connection_string = "unix:" + socket_path
+
+    def release_resouces(self):
+        shutil.rmtree(self._socket_tempdir)


[buildstream] 09/16: scheduler: no redundant signal code on win32

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit df3f398d0b2e31c24700123523f3fd74accd95c0
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 4 18:12:12 2019 +0100

    scheduler: no redundant signal code on win32
    
    There's no point disconnecting or blocking signals that don't exist on
    e.g. win32. Some of these signal ids are undefined though, which causes
    an AttributeError, so we want to avoid that.
---
 src/buildstream/_scheduler/scheduler.py | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a..4c648d2 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -256,7 +256,8 @@ class Scheduler():
 
         # Block this until we're finished terminating jobs,
         # this will remain blocked forever.
-        signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
+        if self.context.platform.does_support_signals():
+            signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
 
     # jobs_suspended()
     #
@@ -518,14 +519,16 @@ class Scheduler():
     # Connects our signal handler event callbacks to the mainloop
     #
     def _connect_signals(self):
-        self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
-        self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
-        self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
+        if self.context.platform.does_support_signals():
+            self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
+            self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
+            self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
 
     def _disconnect_signals(self):
-        self.loop.remove_signal_handler(signal.SIGINT)
-        self.loop.remove_signal_handler(signal.SIGTSTP)
-        self.loop.remove_signal_handler(signal.SIGTERM)
+        if self.context.platform.does_support_signals():
+            self.loop.remove_signal_handler(signal.SIGINT)
+            self.loop.remove_signal_handler(signal.SIGTSTP)
+            self.loop.remove_signal_handler(signal.SIGTERM)
 
     def _terminate_jobs_real(self):
         # 20 seconds is a long time, it can take a while and sometimes


[buildstream] 12/16: CASDProcessManager: 'release_resources' convention

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 5c159ff48d2a766dd4735f589f0121f32dd1e822
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 11 13:32:21 2019 +0100

    CASDProcessManager: 'release_resources' convention
    
    Elsewhere in cascache, cleaning up is called 'release_resources', so
    follow that convention for consistency.
    
    Also fix a case where self._process was not set to None in terminate().
---
 src/buildstream/_cas/cascache.py           |  3 +--
 src/buildstream/_cas/casdprocessmanager.py | 25 +++++++++++--------------
 2 files changed, 12 insertions(+), 16 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 7c37d0e..3e8edce 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -177,8 +177,7 @@ class CASCache():
 
         if self._casd_process_manager:
             self.close_grpc_channels()
-            self._casd_process_manager.terminate(messenger)
-            self._casd_process_manager.clean_up()
+            self._casd_process_manager.release_resources(messenger)
             self._casd_process_manager = None
 
     # contains():
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 1ae5e8e..6c40725 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -100,14 +100,20 @@ class CASDProcessManager:
 
         return os.path.join(self._log_dir, str(self.start_time) + ".log")
 
-    # terminate()
+    # release_resources()
     #
-    # Terminate the buildbox casd process
+    # Terminate the process and release related resources.
     #
-    # Args:
-    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+    def release_resources(self, messenger=None):
+        self._terminate(messenger)
+        self._process = None
+        shutil.rmtree(self._socket_tempdir)
+
+    # _terminate()
+    #
+    # Terminate the buildbox casd process.
     #
-    def terminate(self, messenger=None):
+    def _terminate(self, messenger=None):
         assert self._watcher is None
         assert self._failure_callback is None
 
@@ -115,7 +121,6 @@ class CASDProcessManager:
 
         if return_code is not None:
             # buildbox-casd is already dead
-            self._process = None
 
             if messenger:
                 messenger.message(
@@ -149,7 +154,6 @@ class CASDProcessManager:
                         messenger.message(
                             Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
                         )
-                    self._process = None
                     return
 
         if return_code != 0 and messenger:
@@ -162,13 +166,6 @@ class CASDProcessManager:
                 )
             )
 
-    # clean_up()
-    #
-    # After termination, clean up any additional resources
-    #
-    def clean_up(self):
-        shutil.rmtree(self._socket_tempdir)
-
     # set_failure_callback()
     #
     # Call this function if the CASD process stops unexpectedly.


[buildstream] 15/16: CASDProcessManager: expect return code 1 on win32

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 7d0bff1ff0b9e32bf9555ed8b68f8e1caed677a2
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 15 13:08:34 2019 +0100

    CASDProcessManager: expect return code 1 on win32
---
 src/buildstream/_cas/casdprocessmanager.py | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index e83e128..fb6caf8 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -22,6 +22,7 @@ import os
 import shutil
 import signal
 import subprocess
+import sys
 import tempfile
 import time
 
@@ -172,7 +173,15 @@ class CASDProcessManager:
                         )
                     return
 
-        if return_code != 0 and messenger:
+        expected_return_code = 0
+        if sys.platform == "win32":
+            # Note that the return code is "1" on Windows by definition -
+            # `POpen.terminate()` calls `TerminateProcess()`, which specifies
+            # the return code for the process. Python specifies "1" as the
+            # return code.
+            expected_return_code = 1
+
+        if return_code != expected_return_code and messenger:
             messenger.message(
                 Message(
                     MessageType.BUG,


[buildstream] 02/16: testing/.../site: del unnecessary global 'version'

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 2d3eac0cfb2bd1fe27088e6cdfeeec0d01d0fbc3
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 15 14:22:00 2019 +0100

    testing/.../site: del unnecessary global 'version'
    
    It's not doing any harm, but I don't think we mean to have that in the
    module's namespace.
---
 src/buildstream/testing/_utils/site.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/buildstream/testing/_utils/site.py b/src/buildstream/testing/_utils/site.py
index ca74d95..c614794 100644
--- a/src/buildstream/testing/_utils/site.py
+++ b/src/buildstream/testing/_utils/site.py
@@ -20,6 +20,7 @@ try:
     # e.g. on Mac via Homebrew we get "git version 2.19.0".
     version = tuple(int(x) for x in out.split(' ')[2].split('.')[:3])
     HAVE_OLD_GIT = version < (1, 8, 5)
+    del version
 
     GIT_ENV = {
         'GIT_AUTHOR_DATE': '1320966000 +0200',


[buildstream] 01/16: testing/.../site: windows-friendly HAVE_OLD_GIT

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 83a538eee784873742397c05efda9fe18bb9e272
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 15 14:21:55 2019 +0100

    testing/.../site: windows-friendly HAVE_OLD_GIT
---
 src/buildstream/testing/_utils/site.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/buildstream/testing/_utils/site.py b/src/buildstream/testing/_utils/site.py
index b098b7d..ca74d95 100644
--- a/src/buildstream/testing/_utils/site.py
+++ b/src/buildstream/testing/_utils/site.py
@@ -16,7 +16,9 @@ try:
     HAVE_GIT = True
 
     out = str(subprocess.check_output(['git', '--version']), "utf-8")
-    version = tuple(int(x) for x in out.split(' ')[2].split('.'))
+    # e.g. on Git for Windows we get "git version 2.21.0.windows.1".
+    # e.g. on Mac via Homebrew we get "git version 2.19.0".
+    version = tuple(int(x) for x in out.split(' ')[2].split('.')[:3])
     HAVE_OLD_GIT = version < (1, 8, 5)
 
     GIT_ENV = {


[buildstream] 10/16: _signals.blocked: early-out on win32

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d80a5f50975fa398749b172b4b729d163bde6f06
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 15:09:39 2019 +0100

    _signals.blocked: early-out on win32
---
 src/buildstream/_signals.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index a29cbdc..92e760e 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -179,6 +179,11 @@ def suspendable(suspend_callback, resume_callback):
 #
 @contextmanager
 def blocked(signal_list, ignore=True):
+    if sys.platform == 'win32':
+        # Win32 does not support any signals that we are interested in, and we
+        # also can't use `signal.phtread_sigmask`, so early out here.
+        yield
+        return
 
     with ExitStack() as stack:
 


[buildstream] 08/16: job: no redundant signal code on win32

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 075cf2aaab2ee483bd4b98ba37f813de886fc51a
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 4 18:11:02 2019 +0100

    job: no redundant signal code on win32
    
    There's no point blocking or handling signals that aren't supported on
    e.g. win32. Some of these signal ids aren't defined on unsupported
    platforms though, which would lead to an AttributeError at runtime.
    Avoid that by not invoking the redundant code.
---
 src/buildstream/_scheduler/jobs/job.py | 35 +++++++++++++++++++++-------------
 1 file changed, 22 insertions(+), 13 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 7d901a3..677fccc 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -200,6 +200,7 @@ class Job():
         self._tries += 1
         self._parent_start_listening()
 
+        does_platform_support_signals = self._scheduler.context.platform.does_support_signals()
         child_job = self.create_child_job(  # pylint: disable=assignment-from-no-return
             action_name=self.action_name,
             messenger=self._scheduler.context.messenger,
@@ -209,6 +210,7 @@ class Job():
             tries=self._tries,
             message_element_name=self._message_element_name,
             message_element_key=self._message_element_key,
+            does_platform_support_signals=does_platform_support_signals,
         )
 
         if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
@@ -228,7 +230,10 @@ class Job():
         # the child process does not inherit the parent's state, but the main
         # process will be notified of any signal after we launch the child.
         #
-        with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+        if does_platform_support_signals:
+            with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+                self._process.start()
+        else:
             self._process.start()
 
         # Wait for the child task to complete.
@@ -631,7 +636,8 @@ class ChildJob():
 
     def __init__(
             self, action_name, messenger, logdir, logfile, max_retries, tries,
-            message_element_name, message_element_key):
+            message_element_name, message_element_key,
+            does_platform_support_signals):
 
         self.action_name = action_name
 
@@ -643,6 +649,8 @@ class ChildJob():
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
+        self._does_platform_support_signals = does_platform_support_signals
+
         self._queue = None
 
     # message():
@@ -732,17 +740,18 @@ class ChildJob():
     #
     def child_action(self, queue):
 
-        # This avoids some SIGTSTP signals from grandchildren
-        # getting propagated up to the master process
-        os.setsid()
-
-        # First set back to the default signal handlers for the signals
-        # we handle, and then clear their blocked state.
-        #
-        signal_list = [signal.SIGTSTP, signal.SIGTERM]
-        for sig in signal_list:
-            signal.signal(sig, signal.SIG_DFL)
-        signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
+        if self._does_platform_support_signals:
+            # This avoids some SIGTSTP signals from grandchildren
+            # getting propagated up to the master process
+            os.setsid()
+
+            # First set back to the default signal handlers for the signals
+            # we handle, and then clear their blocked state.
+            #
+            signal_list = [signal.SIGTSTP, signal.SIGTERM]
+            for sig in signal_list:
+                signal.signal(sig, signal.SIG_DFL)
+            signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
 
         # Assign the queue we passed across the process boundaries
         #


[buildstream] 11/16: cascache: extract CASDProcess in new module

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 998942215c7a33227ead23bc182f379ef062c47a
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 17:14:32 2019 +0100

    cascache: extract CASDProcess in new module
    
    Make it easier to specialize handling of the buildbox-casd process on
    Windows, by splitting it into it's own class. This allows us to
    encapsulate some decisions, and decreases the complexity of the CASCache
    class.
    
    Take some of the complexity out of this file by splitting the
    responsibility of managing the process out to another file.
---
 src/buildstream/_cas/cascache.py           | 149 +++----------------
 src/buildstream/_cas/casdprocessmanager.py | 220 +++++++++++++++++++++++++++++
 src/buildstream/_scheduler/scheduler.py    |  22 +--
 src/buildstream/_stream.py                 |   2 +-
 4 files changed, 245 insertions(+), 148 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 83b8e85..7c37d0e 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,10 +25,7 @@ import errno
 import contextlib
 import ctypes
 import multiprocessing
-import shutil
 import signal
-import subprocess
-import tempfile
 import time
 
 import grpc
@@ -40,8 +37,8 @@ from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
 from .. import _signals, utils
 from ..types import FastEnum
 from .._exceptions import CASCacheError
-from .._message import Message, MessageType
 
+from .casdprocessmanager import CASDProcessManager
 from .casremote import _CASBatchRead, _CASBatchUpdate
 
 _BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
 # Refresh interval for disk usage of local cache in seconds
 _CACHE_USAGE_REFRESH = 5
 
-_CASD_MAX_LOGFILES = 10
-
 
 class CASLogLevel(FastEnum):
     WARNING = "warning"
@@ -80,35 +75,11 @@ class CASCache():
         os.makedirs(self.tmpdir, exist_ok=True)
 
         if casd:
-            # Place socket in global/user temporary directory to avoid hitting
-            # the socket path length limit.
-            self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
-            self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
-
-            casd_args = [utils.get_host_tool('buildbox-casd')]
-            casd_args.append('--bind=unix:' + self._casd_socket_path)
-            casd_args.append('--log-level=' + log_level.value)
-
-            if cache_quota is not None:
-                casd_args.append('--quota-high={}'.format(int(cache_quota)))
-                casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
-
-                if protect_session_blobs:
-                    casd_args.append('--protect-session-blobs')
-
-            casd_args.append(path)
-
-            self._casd_start_time = time.time()
-            self.casd_logfile = self._rotate_and_get_next_logfile()
-
-            with open(self.casd_logfile, "w") as logfile_fp:
-                # Block SIGINT on buildbox-casd, we don't need to stop it
-                # The frontend will take care of it if needed
-                with _signals.blocked([signal.SIGINT], ignore=False):
-                    self._casd_process = subprocess.Popen(
-                        casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+            log_dir = os.path.join(self.casdir, "logs")
+            self._casd_process_manager = CASDProcessManager(
+                path, log_dir, log_level, cache_quota, protect_session_blobs)
         else:
-            self._casd_process = None
+            self._casd_process_manager = None
 
         self._casd_channel = None
         self._casd_cas = None
@@ -120,16 +91,16 @@ class CASCache():
 
         # Popen objects are not pickle-able, however, child processes only
         # need the information whether a casd subprocess was started or not.
-        assert '_casd_process' in state
-        state['_casd_process'] = bool(self._casd_process)
+        assert '_casd_process_manager' in state
+        state['_casd_process_manager'] = bool(self._casd_process_manager)
 
         return state
 
     def _init_casd(self):
-        assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+        assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
 
         if not self._casd_channel:
-            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
+            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
             self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
             self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
 
@@ -143,7 +114,7 @@ class CASCache():
                     if e.code() == grpc.StatusCode.UNAVAILABLE:
                         # casd is not ready yet, try again after a 10ms delay,
                         # but don't wait for more than 15s
-                        if time.time() < self._casd_start_time + 15:
+                        if time.time() < self._casd_process_manager.start_time + 15:
                             time.sleep(1 / 100)
                             continue
 
@@ -204,10 +175,11 @@ class CASCache():
         if self._cache_usage_monitor:
             self._cache_usage_monitor.release_resources()
 
-        if self._casd_process:
+        if self._casd_process_manager:
             self.close_grpc_channels()
-            self._terminate_casd_process(messenger)
-            shutil.rmtree(self._casd_socket_tempdir)
+            self._casd_process_manager.terminate(messenger)
+            self._casd_process_manager.clean_up()
+            self._casd_process_manager = None
 
     # contains():
     #
@@ -684,30 +656,6 @@ class CASCache():
     #             Local Private Methods            #
     ################################################
 
-    # _rotate_and_get_next_logfile()
-    #
-    # Get the logfile to use for casd
-    #
-    # This will ensure that we don't create too many casd log files by
-    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
-    #
-    # Returns:
-    #   (str): the path to the log file to use
-    #
-    def _rotate_and_get_next_logfile(self):
-        log_dir = os.path.join(self.casdir, "logs")
-
-        try:
-            existing_logs = sorted(os.listdir(log_dir))
-        except FileNotFoundError:
-            os.makedirs(log_dir)
-        else:
-            while len(existing_logs) >= _CASD_MAX_LOGFILES:
-                logfile_to_delete = existing_logs.pop(0)
-                os.remove(os.path.join(log_dir, logfile_to_delete))
-
-        return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
     def _refpath(self, ref):
         return os.path.join(self.casdir, 'refs', 'heads', ref)
 
@@ -976,67 +924,6 @@ class CASCache():
         # Upload any blobs missing on the server
         self.send_blobs(remote, missing_blobs)
 
-    # _terminate_casd_process()
-    #
-    # Terminate the buildbox casd process
-    #
-    # Args:
-    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
-    #
-    def _terminate_casd_process(self, messenger=None):
-        return_code = self._casd_process.poll()
-
-        if return_code is not None:
-            # buildbox-casd is already dead
-            self._casd_process = None
-
-            if messenger:
-                messenger.message(
-                    Message(
-                        MessageType.BUG,
-                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
-                            return_code, self.casd_logfile
-                        ),
-                    )
-                )
-            return
-
-        self._casd_process.terminate()
-
-        try:
-            # Don't print anything if buildbox-casd terminates quickly
-            return_code = self._casd_process.wait(timeout=0.5)
-        except subprocess.TimeoutExpired:
-            if messenger:
-                cm = messenger.timed_activity("Terminating buildbox-casd")
-            else:
-                cm = contextlib.suppress()
-            with cm:
-                try:
-                    return_code = self._casd_process.wait(timeout=15)
-                except subprocess.TimeoutExpired:
-                    self._casd_process.kill()
-                    self._casd_process.wait(timeout=15)
-
-                    if messenger:
-                        messenger.message(
-                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
-                        )
-                    self._casd_process = None
-                    return
-
-        if return_code != 0 and messenger:
-            messenger.message(
-                Message(
-                    MessageType.BUG,
-                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
-                        return_code, self.casd_logfile
-                    ),
-                )
-            )
-
-        self._casd_process = None
-
     # get_cache_usage():
     #
     # Fetches the current usage of the CAS local cache.
@@ -1050,16 +937,16 @@ class CASCache():
 
         return self._cache_usage_monitor.get_cache_usage()
 
-    # get_casd_process()
+    # get_casd_process_manager()
     #
     # Get the underlying buildbox-casd process
     #
     # Returns:
     #   (subprocess.Process): The casd process that is used for the current cascache
     #
-    def get_casd_process(self):
-        assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
-        return self._casd_process
+    def get_casd_process_manager(self):
+        assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+        return self._casd_process_manager
 
 
 # _CASCacheUsage
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 0000000..1ae5e8e
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,220 @@
+#
+#  Copyright (C) 2018 Codethink Limited
+#  Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import asyncio
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+from .. import _signals, utils
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+#     path (str): The root directory for the CAS repository
+#     log_dir (str): The directory for the logs
+#     log_level (LogLevel): Log level to give to buildbox-casd for logging
+#     cache_quota (int): User configured cache quota
+#     protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+
+    def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+        self._log_dir = log_dir
+
+        # Place socket in global/user temporary directory to avoid hitting
+        # the socket path length limit.
+        self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+        self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+
+        casd_args = [utils.get_host_tool('buildbox-casd')]
+        casd_args.append('--bind=unix:' + self.socket_path)
+        casd_args.append('--log-level=' + log_level.value)
+
+        if cache_quota is not None:
+            casd_args.append('--quota-high={}'.format(int(cache_quota)))
+            casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
+
+            if protect_session_blobs:
+                casd_args.append('--protect-session-blobs')
+
+        casd_args.append(path)
+
+        self.start_time = time.time()
+        self.logfile = self._rotate_and_get_next_logfile()
+
+        with open(self.logfile, "w") as logfile_fp:
+            # Block SIGINT on buildbox-casd, we don't need to stop it
+            # The frontend will take care of it if needed
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                self._process = subprocess.Popen(
+                    casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+        self._failure_callback = None
+        self._watcher = None
+
+    # _rotate_and_get_next_logfile()
+    #
+    # Get the logfile to use for casd
+    #
+    # This will ensure that we don't create too many casd log files by
+    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+    #
+    # Returns:
+    #   (str): the path to the log file to use
+    #
+    def _rotate_and_get_next_logfile(self):
+        try:
+            existing_logs = sorted(os.listdir(self._log_dir))
+        except FileNotFoundError:
+            os.makedirs(self._log_dir)
+        else:
+            while len(existing_logs) >= _CASD_MAX_LOGFILES:
+                logfile_to_delete = existing_logs.pop(0)
+                os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+        return os.path.join(self._log_dir, str(self.start_time) + ".log")
+
+    # terminate()
+    #
+    # Terminate the buildbox casd process
+    #
+    # Args:
+    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+    #
+    def terminate(self, messenger=None):
+        assert self._watcher is None
+        assert self._failure_callback is None
+
+        return_code = self._process.poll()
+
+        if return_code is not None:
+            # buildbox-casd is already dead
+            self._process = None
+
+            if messenger:
+                messenger.message(
+                    Message(
+                        MessageType.BUG,
+                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+                            return_code, self.logfile
+                        ),
+                    )
+                )
+            return
+
+        self._process.terminate()
+
+        try:
+            # Don't print anything if buildbox-casd terminates quickly
+            return_code = self._process.wait(timeout=0.5)
+        except subprocess.TimeoutExpired:
+            if messenger:
+                cm = messenger.timed_activity("Terminating buildbox-casd")
+            else:
+                cm = contextlib.suppress()
+            with cm:
+                try:
+                    return_code = self._process.wait(timeout=15)
+                except subprocess.TimeoutExpired:
+                    self._process.kill()
+                    self._process.wait(timeout=15)
+
+                    if messenger:
+                        messenger.message(
+                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+                        )
+                    self._process = None
+                    return
+
+        if return_code != 0 and messenger:
+            messenger.message(
+                Message(
+                    MessageType.BUG,
+                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
+                        return_code, self.logfile
+                    ),
+                )
+            )
+
+    # clean_up()
+    #
+    # After termination, clean up any additional resources
+    #
+    def clean_up(self):
+        shutil.rmtree(self._socket_tempdir)
+
+    # set_failure_callback()
+    #
+    # Call this function if the CASD process stops unexpectedly.
+    #
+    # Note that we guarantee that the lifetime of any 'watcher' used is bound
+    # to the lifetime of the callback - we won't hang on to the asyncio loop
+    # longer than necessary.
+    #
+    # We won't be able to use watchers on win32, so we'll need to support
+    # another approach.
+    #
+    # Args:
+    #   func (callable): a callable that takes no parameters
+    #
+    def set_failure_callback(self, func):
+        assert func is not None
+        assert self._watcher is None
+        assert self._failure_callback is None, "We only support one callback for now"
+        self._failure_callback = func
+        self._watcher = asyncio.get_child_watcher()
+        self._watcher.add_child_handler(self._process.pid, self._on_casd_failure)
+
+    # clear_failure_callback()
+    #
+    # No longer call this callable if the CASD process stops unexpectedly
+    #
+    # Args:
+    #   func (callable): The callable that was provided to add_failure_callback().
+    #                    Supplying this again allows us to do error checking.
+    #
+    def clear_failure_callback(self, func):
+        assert func is not None
+        assert self._failure_callback == func, "We only support one callback for now"
+        self._watcher.remove_child_handler(self._process.pid)
+        self._failure_callback = None
+        self._watcher = None
+
+    # _on_casd_failure()
+    #
+    # Handler for casd process terminating unexpectedly
+    #
+    # Args:
+    #   pid (int): the process id under which buildbox-casd was running
+    #   returncode (int): the return code with which buildbox-casd exited
+    #
+    def _on_casd_failure(self, pid, returncode):
+        assert self._failure_callback is not None
+        self._process.returncode = returncode
+        self._failure_callback()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 4c648d2..24086be 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -138,7 +138,6 @@ class Scheduler():
         self._suspendtime = None              # Session time compensation for suspended state
         self._queue_jobs = True               # Whether we should continue to queue jobs
         self._state = state
-        self._casd_process = None             # handle to the casd process for monitoring purpose
 
         # Bidirectional queue to send notifications back to the Scheduler's owner
         self._notification_queue = notification_queue
@@ -152,8 +151,8 @@ class Scheduler():
     #
     # Args:
     #    queues (list): A list of Queue objects
-    #    casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
-    #                                         of failures.
+    #    casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+    #                                                        of failures.
     #
     # Returns:
     #    (SchedStatus): How the scheduling terminated
@@ -163,7 +162,7 @@ class Scheduler():
     # elements have been processed by each queue or when
     # an error arises
     #
-    def run(self, queues, casd_process):
+    def run(self, queues, casd_process_manager):
 
         # Hold on to the queues to process
         self.queues = queues
@@ -183,9 +182,7 @@ class Scheduler():
         self._connect_signals()
 
         # Watch casd while running to ensure it doesn't die
-        self._casd_process = casd_process
-        _watcher = asyncio.get_child_watcher()
-        _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        casd_process_manager.set_failure_callback(self._abort_on_casd_failure)
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,8 +192,7 @@ class Scheduler():
             self.loop.close()
 
         # Stop watching casd
-        _watcher.remove_child_handler(casd_process.pid)
-        self._casd_process = None
+        casd_process_manager.clear_failure_callback(self._abort_on_casd_failure)
 
         # Stop handling unix signals
         self._disconnect_signals()
@@ -338,15 +334,9 @@ class Scheduler():
     # This will terminate immediately all jobs, since buildbox-casd is dead,
     # we can't do anything with them anymore.
     #
-    # Args:
-    #   pid (int): the process id under which buildbox-casd was running
-    #   returncode (int): the return code with which buildbox-casd exited
-    #
-    def _abort_on_casd_failure(self, pid, returncode):
+    def _abort_on_casd_failure(self):
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
         self._notify(Notification(NotificationType.MESSAGE, message=message))
-
-        self._casd_process.returncode = returncode
         self.terminate_jobs()
 
     # _start_job()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6e4e5ca..500adb8 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1375,7 +1375,7 @@ class Stream():
         if self._session_start_callback is not None:
             self._session_start_callback()
 
-        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
 
         if status == SchedStatus.ERROR:
             raise StreamError()


[buildstream] 13/16: cascache: refactor, extract "connection_string"

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e1c16f309ec2c55892207c04fa04d94f9c241a05
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Mon Oct 14 13:53:00 2019 +0100

    cascache: refactor, extract "connection_string"
    
    Prepare for other kinds of connection, by reducing repetition and having
    a single point for configuring the connection string.
---
 src/buildstream/_cas/cascache.py           | 2 +-
 src/buildstream/_cas/casdprocessmanager.py | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 3e8edce..4114d9f 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -100,7 +100,7 @@ class CASCache():
         assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
 
         if not self._casd_channel:
-            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
+            self._casd_channel = grpc.insecure_channel(self._casd_process_manager.connection_string)
             self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
             self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
 
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 6c40725..2f28910 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -50,10 +50,11 @@ class CASDProcessManager:
         # Place socket in global/user temporary directory to avoid hitting
         # the socket path length limit.
         self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
-        self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+        socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+        self.connection_string = "unix:" + socket_path
 
         casd_args = [utils.get_host_tool('buildbox-casd')]
-        casd_args.append('--bind=unix:' + self.socket_path)
+        casd_args.append('--bind=' + self.connection_string)
         casd_args.append('--log-level=' + log_level.value)
 
         if cache_quota is not None:


[buildstream] 04/16: _signals.suspendable:"outermost" -> "is_outermost"

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 127070497b16505c1d80370294bb52114815581a
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 09:24:25 2019 +0100

    _signals.suspendable:"outermost" -> "is_outermost"
---
 src/buildstream/_signals.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index 31982c1..ec598fc 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -146,17 +146,17 @@ def suspend_handler(sig, frame):
 def suspendable(suspend_callback, resume_callback):
     global suspendable_stack                  # pylint: disable=global-statement
 
-    outermost = bool(not suspendable_stack)
+    is_outermost = bool(not suspendable_stack)
     suspender = Suspender(suspend_callback, resume_callback)
     suspendable_stack.append(suspender)
 
-    if outermost:
+    if is_outermost:
         original_stop = signal.signal(signal.SIGTSTP, suspend_handler)
 
     try:
         yield
     finally:
-        if outermost:
+        if is_outermost:
             signal.signal(signal.SIGTSTP, original_stop)
 
         suspendable_stack.pop()


[buildstream] 03/16: _platform: add does_support_signals() accessor

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ca9dd39eb5d62bcf86fc811332c974d348531039
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 4 18:07:53 2019 +0100

    _platform: add does_support_signals() accessor
    
    We'll need this information for doing the right thing with signals on
    Windows.
---
 src/buildstream/_platform/platform.py | 12 ++++++++++++
 src/buildstream/_platform/win32.py    | 28 ++++++++++++++++++++++++++++
 2 files changed, 40 insertions(+)

diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py
index af49b9e..b364ef8 100644
--- a/src/buildstream/_platform/platform.py
+++ b/src/buildstream/_platform/platform.py
@@ -190,6 +190,18 @@ class Platform():
         # set to the platform default by `get_start_method`.
         return multiprocessing.get_start_method() != 'fork'
 
+    # does_support_signals():
+    #
+    # Returns True if the platform has good support for signals, this will not
+    # be true for Windows.
+    #
+    # Returns:
+    #    (bool): Whether signals are supported or not
+    #
+    def does_support_signals(self):
+        # Most platforms support signals, so the default is True.
+        return True
+
     ##################################################################
     #                        Sandbox functions                       #
     ##################################################################
diff --git a/src/buildstream/_platform/win32.py b/src/buildstream/_platform/win32.py
index 3668001..8dc2d8e 100644
--- a/src/buildstream/_platform/win32.py
+++ b/src/buildstream/_platform/win32.py
@@ -57,3 +57,31 @@ class Win32(Platform):
         self.check_sandbox_config = Win32._check_dummy_sandbox_config
         self.create_sandbox = Win32._create_dummy_sandbox
         return True
+
+    def does_support_signals(self):
+        # Windows does not have good support for signals, and we shouldn't
+        # handle them in the same way we do on UNIX.
+        #
+        # From the MSDN docs:
+        #
+        # > SIGINT is not supported for any Win32 application. When a CTRL+C
+        # > interrupt occurs, Win32 operating systems generate a new thread to
+        # > specifically handle that interrupt. This can cause a single-thread
+        # > application, such as one in UNIX, to become multithreaded and cause
+        # > unexpected behavior.
+        #
+        # > The SIGILL and SIGTERM signals are not generated under Windows.
+        # > They are included for ANSI compatibility. Therefore, you can set
+        # > signal handlers for these signals by using signal, and you can also
+        # > explicitly generate these signals by calling raise.
+        #
+        # The only other signals that are defined in signal.h on Windows are
+        # not relevant to us:
+        #
+        # - SIGABRT
+        # - SIGFPE
+        # - SIGSEGV
+        #
+        # https://docs.microsoft.com/en-gb/cpp/c-runtime-library/reference/signal
+        #
+        return False


[buildstream] 07/16: jobs/job: use named parameters for long list

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 6c99e0e3b2a039a8d4b1f3894f437a6c853c9b69
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 13:45:43 2019 +0100

    jobs/job: use named parameters for long list
    
    This invocation seems potentially error-prone as it is quite long, use
    named parameters to make it a bit safer.
---
 src/buildstream/_scheduler/jobs/job.py | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 940b7d2..7d901a3 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -201,14 +201,14 @@ class Job():
         self._parent_start_listening()
 
         child_job = self.create_child_job(  # pylint: disable=assignment-from-no-return
-            self.action_name,
-            self._scheduler.context.messenger,
-            self._scheduler.context.logdir,
-            self._logfile,
-            self._max_retries,
-            self._tries,
-            self._message_element_name,
-            self._message_element_key
+            action_name=self.action_name,
+            messenger=self._scheduler.context.messenger,
+            logdir=self._scheduler.context.logdir,
+            logfile=self._logfile,
+            max_retries=self._max_retries,
+            tries=self._tries,
+            message_element_name=self._message_element_name,
+            message_element_key=self._message_element_key,
         )
 
         if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():


[buildstream] 05/16: _signals.terminator: "outermost" -> "is_outermost"

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 23d1f1350baadabd6332de128bab318ed81b34e5
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 09:59:18 2019 +0100

    _signals.terminator: "outermost" -> "is_outermost"
---
 src/buildstream/_signals.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index ec598fc..a7f32d6 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -86,16 +86,16 @@ def terminator(terminate_func):
         yield
         return
 
-    outermost = bool(not terminator_stack)
+    is_outermost = bool(not terminator_stack)
 
     terminator_stack.append(terminate_func)
-    if outermost:
+    if is_outermost:
         original_handler = signal.signal(signal.SIGTERM, terminator_handler)
 
     try:
         yield
     finally:
-        if outermost:
+        if is_outermost:
             signal.signal(signal.SIGTERM, original_handler)
         terminator_stack.pop()
 


[buildstream] 06/16: _signals.suspendable: early-out on win32

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit bfa2763e51b80ce6d4dfdd558d7dc53d855e57ce
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 09:25:20 2019 +0100

    _signals.suspendable: early-out on win32
---
 src/buildstream/_signals.py | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index a7f32d6..a29cbdc 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -144,6 +144,12 @@ def suspend_handler(sig, frame):
 #
 @contextmanager
 def suspendable(suspend_callback, resume_callback):
+    if sys.platform == 'win32':
+        # Win32 does not support SIGTSTP, at least up to Windows 10, so we
+        # won't be able to handle it here.
+        yield
+        return
+
     global suspendable_stack                  # pylint: disable=global-statement
 
     is_outermost = bool(not suspendable_stack)


[buildstream] 16/16: cas: localhost tcp/ip connection when on win32

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 739753bbb54ac958d1762f95992cbc7b4ec8d3fe
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 11 13:25:11 2019 +0100

    cas: localhost tcp/ip connection when on win32
    
    To support Windows, add the possibility of connecting to buildbox-casd
    via a localhost connection, instead of a UNIX socket.
---
 src/buildstream/_cas/cascache.py           | 11 ++++++--
 src/buildstream/_cas/casdprocessmanager.py | 45 ++++++++++++++++++++++++++++--
 2 files changed, 52 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 4114d9f..8638e4d 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -26,6 +26,7 @@ import contextlib
 import ctypes
 import multiprocessing
 import signal
+import sys
 import time
 
 import grpc
@@ -38,7 +39,7 @@ from .. import _signals, utils
 from ..types import FastEnum
 from .._exceptions import CASCacheError
 
-from .casdprocessmanager import CASDProcessManager
+from .casdprocessmanager import CASDProcessManager, ConnectionType
 from .casremote import _CASBatchRead, _CASBatchUpdate
 
 _BUFFER_SIZE = 65536
@@ -76,8 +77,14 @@ class CASCache():
 
         if casd:
             log_dir = os.path.join(self.casdir, "logs")
+
+            if sys.platform == 'win32':
+                connection_type = ConnectionType.LOCALHOST_PORT
+            else:
+                connection_type = ConnectionType.UNIX_SOCKET
+
             self._casd_process_manager = CASDProcessManager(
-                path, log_dir, log_level, cache_quota, protect_session_blobs)
+                path, log_dir, log_level, cache_quota, protect_session_blobs, connection_type)
         else:
             self._casd_process_manager = None
 
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index fb6caf8..40f945f 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -21,6 +21,7 @@ import contextlib
 import os
 import shutil
 import signal
+import socket
 import subprocess
 import sys
 import tempfile
@@ -32,9 +33,31 @@ from ..types import FastEnum
 
 _CASD_MAX_LOGFILES = 10
 
+# Note that we want to make sure that BuildStream and buildbox-casd are on the
+# same page about what the hostname is, for that reason we may want to avoid
+# e.g. empty string as the hostname. We also don't want buildbox-casd to accept
+# connections from other machines in this use-case.
+#
+# Note that buildbox-casd will stop with an error if it fails to listen on all
+# addresses, but if it sucessfully listens on any then it will continue. For
+# this reason we don't want to choose `localhost` as the hostname, otherwise it
+# will also bind to the ipv6 address `::1`.
+#
+_HOSTNAME = "127.0.0.1"
+
 
 class ConnectionType(FastEnum):
     UNIX_SOCKET = 0
+    LOCALHOST_PORT = 1
+
+
+# Note that it's necessary to use the LOCALHOST_PORT option on Windows, because
+# grpc doesn't support AF_UNIX on win32 yet. You can verify this in the grpc
+# source by searching for 'GRPC_HAVE_UNIX_SOCKET'.
+#
+# There also isn't support in grpc for receiving a WSADuplicateSocket, so we
+# can't pass one over. You can verify this in the grpc source by searching for
+# 'WSASocket' and noting that the lpProtocolInfo parameter is always null.
 
 
 # CASDProcessManager
@@ -62,8 +85,11 @@ class CASDProcessManager:
     ):
         self._log_dir = log_dir
 
-        assert connection_type == ConnectionType.UNIX_SOCKET
-        self._connection = _UnixSocketConnection()
+        if connection_type == ConnectionType.UNIX_SOCKET:
+            self._connection = _UnixSocketConnection()
+        else:
+            assert connection_type == ConnectionType.LOCALHOST_PORT
+            self._connection = _LocalhostPortConnection()
 
         casd_args = [utils.get_host_tool('buildbox-casd')]
         casd_args.append('--bind=' + self.connection_string)
@@ -242,6 +268,21 @@ class CASDProcessManager:
         self._failure_callback()
 
 
+class _LocalhostPortConnection:
+    def __init__(self):
+        # Note that there is a race-condition between us finding an available
+        # port and buildbox-casd taking ownership of it. If another process
+        # takes the port in the mean time, we will later fail with an error.
+        with socket.socket() as s:
+            s.bind((_HOSTNAME, 0))
+            hostname, port = s.getsockname()
+        assert hostname == _HOSTNAME
+        self.connection_string = "{}:{}".format(hostname, port)
+
+    def release_resouces(self):
+        pass
+
+
 class _UnixSocketConnection:
     def __init__(self):
         # Place socket in global/user temporary directory to avoid hitting