You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by be...@apache.org on 2021/02/24 19:24:30 UTC

[buildstream] branch bschubert/remove-pipe-job updated (2149c97 -> fa32cda)

This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a change to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


 discard 2149c97  job.py: Completely remove the pipe between child and parent process
 discard 01b4777  job.py: Stop using the queue to send data between the child and parent
 discard c653e31  _messenger.py: Add a hook to explicitely start a new action context
     add 0470fdc  .github/workflows/merge.yml: Publish docs in the docs/ subdirectory
     add 6f603d7  .github/workflows/merge.yml: Fix tab errors in previous commit
     add 20add16  .github/workflows/merge.yml: Fix publishing of docs tarball.
     add f0a0ffd  .asf.yaml: Configure pages to be published from /docs
     add f1f4560  doc/badges.py: Adjustment of badge generation
     add aeb7a8d  doc/badges.py: Adding staticly generated HTML page for automated redirects.
     add 8cad3f2  ci: Simplify pipelines using Docker Compose
     add 58fc11c  ci: Enable parallelism for tests
     add a666b0f  Merge pull request #1441 from apache/chandan/compose
     add 8cbbeb1  .asf.yaml: Redirect notifications to the commit list
     add ed48d1b  .github/common.env: Defining TOXENV candidates here
     add 517e7b3  .github/compose/ci.docker-compose.yml: Minor refactoring here
     add e087c8a  .github: Added remote execution CI
     add 443d3ba  Merge pull request #1444 from apache/tristan/remote-execution-testing
     add b51aa86  .github: Refactor how we run the remote execution test
     add 835c4b4  .github: Revived remote cache testing
     add 7e52067  Merge pull request #1445 from apache/tristan/remote-cache-testing
     add e8de0d5  _assetcache.py: Allow explicit re-initialization of remotes.
     add 8171596  element.py: Added internal API _mimic_artifact()
     add ab8f1dc  _artifactelement.py: Override _pull_done()
     add 432c195  _stream.py: Pre-emptive pulling of artifact metadata in some cases
     add 43c8206  _stream.py: Added new _reset() function.
     add 33a5107  tests/frontend/artifact_pull.py: Test pulling artifacts with various deps options
     add 01af64a  tests/frontend/artifact_checkout.py: Test checking out remote artifacts
     add 89d3af2  Merge pull request #1433 from apache/tristan/fix-recursive-artifact-pull
     add 19cd578  Allow certain operations to work without loading a project
     add 1c648cf  tests/frontend/artifact_list_contents.py: Use parametrized tests
     add 4eb2c07  tests/frontend/artifact_list_contents.py: Test listing artifact content without a project
     add 953c0ee  tests/frontend/artifact_checkout.py: Test artifact checkout without project
     add 15f0301  tests/frontend/artifact_delete.py: Test artifact deletion without a project
     add 1e4a48a  tests/frontend/artifact_log.py: Test artifact log without a project
     add 724337b  tests/frontend/artifact_pull.py: Test artifact pull without a project
     add cc66f9e  tests/frontend/artifact_show.py: Test artifact show without a project
     add 7dd7690  Merge pull request #1442 from apache/tristan/optional-project
     add 0b1f29f  .github: Add fedora 33 to CI suite
     add 04ed272  .github: Stop testing against Fedora 31
     add e02af38  Merge pull request #1446 from apache/chandan/fedora-33
     add aa6feac  node.pyx: Fixed error reporting in SequenceNode.as_str_list()
     add b6ad110  node.pyx: Fix segfault when calling MappingNode.get_str_list() with default None
     add 88625fb  node.pyi: Adding some missing type annotations
     add 36594b6  node.pyx, node.pxd, node.pyi: Type checking in MappingNode.get_sequence()
     add 77b0114  tests/internals/yaml.py: Test error reporting of SequenceNode.as_str_list()
     add fbda86d  tests/internals/yaml.py: Test MappingNode.get_str_list() with default None
     add fcb7c8b  Merge pull request #1452 from apache/tristan/yaml-fixes
     add 6957eeb  Temporarily disabling notifications
     add 363ae31  Revert "Temporarily disabling notifications"
     add 93a7a3a  buildelement: switch order of timed_activity and sandbox.batch
     add 3c2cace  Merge pull request #1437 from abderrahim/context-manager-order
     add dd80e0f  .asf.yaml: Publish pages from root of the gh-pages branch
     add 3827f3b  .github/workflows/merge.yml: Refactor docs generation
     add 2d5d486  .github/workflows/merge.yml: Fix docs publishing
     add f26dcd8  .asf.yaml: Updating github metadata.
     add c04352b  .asf.yaml: Explicitly setting pages to publish from the "/"
     add f7f9c97  element.py: Remove local __remote_execution_specs instance variable
     add eda23aa  _remotespec.py: Moving RemoteSpec and RemoteExecutionSpec to its own file
     add 497e1ca  Remote execution can only be configured in user configuration
     add 8cfa27f  plugins/elements/junction.py: Remove ambiguous cache related junction parameters
     add a61f9f1  _remotespec.py: Modified format to put the certs in an "auth" dictionary.
     add b8d1de7  Refactor remote asset user configuration
     add 300e813  node.pyi: Fleshing out more type information about nodes.
     add fd51cf2  _context.py: Changed artifact and source cache configuration again.
     add e89a17a  _frontend/widget.py: Print remote execution setup and project cache servers
     add d1d9f33  NEWS: Document breaking change, removed support for deprecated RE config
     add 10ad071  NEWS: Documenting breaking change, redesign of remote cache configuration
     add 7a35498  doc: Redocumenting artifact/source cache servers.
     add c143131  Merge pull request #1434 from apache/tristan/change-remote-config
     add 5034c9c  .github/compose/ci.remote-execution.yml: Fix tests
     add a7fd921  Merge pull request #1444 from apache/tristan/fix-remote-execution-tests
     add 77f2f48  _frontend/cli.py: Removing support for deprecated commands.
     add b05dd1f  _remotespec.py: Adding RemoteSpec.new_from_string()
     add 4b90394  Moving HostMount from _project.py -> types.py
     add 97deb41  Moving some cython functions into _utils.pyx
     add d2c9c89  Support specifying remotes on the command line.
     add f0e2e79  doc: Documenting how to specify remotes on the command line.
     add a292743  NEWS: Updating with some new breaking changes
     add d4cb3be  Merge pull request #1438 from apache/tristan/remote-cli-options
     add 7fcb131  _context.py: Remote `prompt` as a valid configuration key
     add b646a8f  _frontend/widget.py: Format full element names in `bst show --format %{deps}`
     add 651bb00  doc/source/using_config.rst: Completely and thoroughly document user config.
     add aabb43e  src/buildstream/data/userconfig.yaml: Remove some comments.
     add 822b83f  Merge pull request #1442 from apache/tristan/user-config-docs
     add e66910d  _frontend/cli.py: Add `--except` option to `bst source push`
     add 9de6fb3  Merge pull request #1445 from apache/tristan/add-except-option
     add 4f617ed  doc/source/main_install.rst: Update BuildBox components to 0.0.38
     add 5ebe6ab  Merge pull request #1443 from apache/juerg/buildbox
     new 13ac060  _messenger.py: Add a hook to explicitely start a new action context
     new a05e05a  job.py: Stop using the queue to send data between the child and parent
     new fa32cda  job.py: Completely remove the pipe between child and parent process

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2149c97)
            \
             N -- N -- N   refs/heads/bschubert/remove-pipe-job (fa32cda)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .asf.yaml                                          |   12 +
 .github/common.env                                 |    6 +
 .github/compose/buildbarn-config/asset.jsonnet     |   32 +
 .github/compose/buildbarn-config/storage.jsonnet   |   26 +
 .github/compose/ci.buildbarn-remote-cache.yml      |   55 +
 .github/compose/ci.buildstream-remote-cache.yml    |   36 +
 .github/compose/ci.docker-compose.yml              |  112 ++
 .github/compose/ci.remote-execution.yml            |   67 ++
 .github/workflows/ci.yml                           |  189 ++--
 .github/workflows/merge.yml                        |   49 +-
 .github/workflows/release.yml                      |   26 +-
 .pylintrc                                          |    2 -
 NEWS                                               |   18 +
 README.rst                                         |    4 +-
 doc/Makefile                                       |    4 +-
 doc/badges.py                                      |   45 +-
 doc/source/format_project.rst                      |  162 +--
 doc/source/main_install.rst                        |   12 +-
 doc/source/using_commands.rst                      |   71 ++
 doc/source/using_config.rst                        | 1128 +++++++++++++++++---
 doc/source/using_configuring_cache_server.rst      |   57 +-
 setup.py                                           |    2 -
 src/buildstream/_artifactcache.py                  |   49 +-
 src/buildstream/_artifactelement.py                |   11 +
 src/buildstream/_assetcache.py                     |  423 +++-----
 src/buildstream/_cas/casremote.py                  |   12 +-
 src/buildstream/_context.py                        |  429 ++++++--
 src/buildstream/_elementsourcescache.py            |   13 +-
 src/buildstream/_frontend/app.py                   |   25 +-
 src/buildstream/_frontend/cli.py                   |  420 +++++---
 src/buildstream/_frontend/widget.py                |   93 +-
 src/buildstream/_loader/_loader.pyi                |    1 -
 src/buildstream/_loader/_loader.pyx                |   52 -
 src/buildstream/_loader/loader.py                  |    4 +-
 src/buildstream/_platform/platform.py              |    2 +-
 src/buildstream/_project.py                        |   91 +-
 src/buildstream/_remote.py                         |  205 +---
 src/buildstream/_remotespec.py                     |  493 +++++++++
 src/buildstream/_scheduler/jobs/_job.pyi           |    1 -
 src/buildstream/_scheduler/jobs/_job.pyx           |   15 -
 src/buildstream/_scheduler/jobs/job.py             |    2 +-
 src/buildstream/_sourcecache.py                    |   15 +-
 src/buildstream/_stream.py                         |  608 +++++++----
 src/buildstream/_utils.pyi                         |    2 +
 src/buildstream/_utils.pyx                         |   50 +
 src/buildstream/_workspaces.py                     |   10 +-
 src/buildstream/buildelement.py                    |    2 +-
 src/buildstream/data/userconfig.yaml               |   55 +-
 src/buildstream/element.py                         |   58 +-
 src/buildstream/node.pxd                           |    2 +-
 src/buildstream/node.pyi                           |   84 +-
 src/buildstream/node.pyx                           |   82 +-
 src/buildstream/plugins/elements/junction.py       |   13 +-
 src/buildstream/plugins/elements/stack.py          |    2 +-
 src/buildstream/sandbox/_sandboxremote.py          |  180 +---
 src/buildstream/testing/runcli.py                  |    4 +-
 src/buildstream/types.py                           |   22 +-
 tests/artifactcache/capabilities.py                |    6 +-
 tests/artifactcache/config.py                      |  130 +--
 tests/artifactcache/junctions.py                   |  159 +--
 tests/artifactcache/pull.py                        |   15 +-
 tests/artifactcache/push.py                        |   24 +-
 tests/format/project.py                            |    7 +-
 tests/frontend/artifact_checkout.py                |   84 ++
 tests/frontend/artifact_delete.py                  |   11 +-
 tests/frontend/artifact_list_contents.py           |  107 +-
 tests/frontend/artifact_log.py                     |   34 +-
 tests/frontend/artifact_pull.py                    |   83 ++
 tests/frontend/artifact_show.py                    |    9 +-
 tests/frontend/buildcheckout.py                    |    6 +-
 tests/frontend/completions.py                      |   13 +-
 tests/frontend/default_target.py                   |    2 +-
 tests/frontend/help.py                             |    4 +-
 tests/frontend/large_directory.py                  |    2 +-
 .../foo.bst => project/elements/target-import.bst} |    4 +
 tests/frontend/pull.py                             |   68 +-
 tests/frontend/push.py                             |   99 +-
 tests/frontend/remote-caches.py                    |    8 +-
 tests/frontend/workspace.py                        |    2 +-
 tests/integration/artifact.py                      |    6 +-
 tests/integration/cachedfail.py                    |    8 +-
 tests/integration/pullbuildtrees.py                |   16 +-
 tests/integration/shell.py                         |    2 +-
 tests/integration/shellbuildtrees.py               |   12 +-
 tests/internals/yaml.py                            |   31 +
 tests/internals/yaml/list-of-dict.yaml             |    5 +
 tests/internals/yaml/list-of-list.yaml             |    3 +
 tests/remotecache/simple.py                        |    2 +-
 tests/remoteexecution/buildtree.py                 |    4 +-
 tests/remoteexecution/partial.py                   |    5 +-
 tests/sandboxes/remote-exec-config.py              |   63 +-
 .../missing-certs}/project.conf                    |    0
 tests/sourcecache/capabilities.py                  |   12 +-
 tests/sourcecache/fetch.py                         |    6 +-
 tests/sourcecache/push.py                          |   20 +-
 tests/sourcecache/workspace.py                     |    6 +-
 96 files changed, 4230 insertions(+), 2493 deletions(-)
 create mode 100644 .github/common.env
 create mode 100644 .github/compose/buildbarn-config/asset.jsonnet
 create mode 100644 .github/compose/buildbarn-config/storage.jsonnet
 create mode 100644 .github/compose/ci.buildbarn-remote-cache.yml
 create mode 100644 .github/compose/ci.buildstream-remote-cache.yml
 create mode 100644 .github/compose/ci.docker-compose.yml
 create mode 100644 .github/compose/ci.remote-execution.yml
 delete mode 100644 src/buildstream/_loader/_loader.pyi
 delete mode 100644 src/buildstream/_loader/_loader.pyx
 create mode 100644 src/buildstream/_remotespec.py
 delete mode 100644 src/buildstream/_scheduler/jobs/_job.pyi
 delete mode 100644 src/buildstream/_scheduler/jobs/_job.pyx
 create mode 100644 tests/frontend/artifact_checkout.py
 create mode 100644 tests/frontend/artifact_pull.py
 copy tests/frontend/{logging/elements/foo.bst => project/elements/target-import.bst} (54%)
 create mode 100644 tests/internals/yaml/list-of-dict.yaml
 create mode 100644 tests/internals/yaml/list-of-list.yaml
 copy tests/sandboxes/{missing-command => remote-exec-config/missing-certs}/project.conf (100%)


[buildstream] 03/03: job.py: Completely remove the pipe between child and parent process

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit fa32cda5aaf57504722f07ac47ba1f6bfa0f1800
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 12:00:07 2021 +0000

    job.py: Completely remove the pipe between child and parent process
    
    This pipe is not needed at all anymore
---
 src/buildstream/_scheduler/jobs/job.py | 80 ++--------------------------------
 1 file changed, 3 insertions(+), 77 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index e7ac23f..08f1a66 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@
 import asyncio
 import datetime
 import itertools
-import multiprocessing
 import threading
 import traceback
 
@@ -113,8 +112,6 @@ class Job:
         #
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
-        self._pipe_r = None  # The read end of a pipe for message passing
-        self._listening = False  # Whether the parent is currently listening
         self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
         self._result = None  # Return value of child action in the parent
@@ -143,11 +140,7 @@ class Job:
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
-        # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
         self._tries += 1
-        self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -164,7 +157,7 @@ class Job:
         loop = asyncio.get_event_loop()
 
         async def execute():
-            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
             await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
@@ -178,9 +171,6 @@ class Job:
     def terminate(self):
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
-        # Make sure there is no garbage on the pipe
-        self._parent_stop_listening()
-
         if self._task:
             self._child.terminate()
 
@@ -289,16 +279,6 @@ class Job:
     #                  Local Private Methods              #
     #######################################################
 
-    # _parent_shutdown()
-    #
-    # Shuts down the Job on the parent side by reading any remaining
-    # messages on the message pipe and cleaning up any resources.
-    #
-    def _parent_shutdown(self):
-        # Make sure we've read everything we need and then stop listening
-        self._parent_process_pipe()
-        self._parent_stop_listening()
-
     # _parent_child_completed()
     #
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -307,8 +287,6 @@ class Job:
     #    returncode (int): The return code of the child process
     #
     async def _parent_child_completed(self, returncode):
-        self._parent_shutdown()
-
         try:
             returncode = _ReturnCode(returncode)
         except ValueError:
@@ -347,50 +325,7 @@ class Job:
 
         self.parent_complete(status, self._result)
         self._scheduler.job_completed(self, status)
-
-        # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
-        self._pipe_r = self._task = None
-
-    # _parent_process_pipe()
-    #
-    # Reads back message envelopes from the message pipe
-    # in the parent process.
-    #
-    def _parent_process_pipe(self):
-        while self._pipe_r.poll():
-            try:
-                self._pipe_r.recv()
-                assert False, "No message should be received anymore"
-            except EOFError:
-                self._parent_stop_listening()
-                break
-
-    # _parent_recv()
-    #
-    # A callback to handle I/O events from the message
-    # pipe file descriptor in the main process message loop
-    #
-    def _parent_recv(self, *args):
-        self._parent_process_pipe()
-
-    # _parent_start_listening()
-    #
-    # Starts listening on the message pipe
-    #
-    def _parent_start_listening(self):
-        if not self._listening:
-            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
-            self._listening = True
-
-    # _parent_stop_listening()
-    #
-    # Stops listening on the message pipe
-    #
-    def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
-            self._listening = False
+        self._task = None
 
 
 # ChildJob()
@@ -431,7 +366,6 @@ class ChildJob:
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._pipe_w = None  # The write end of a pipe for message passing
         self._thread_id = None  # Thread in which the child executes its action
         self._should_terminate = False
         self._terminate_lock = threading.Lock()
@@ -483,15 +417,9 @@ class ChildJob:
     #
     # Perform the action in the child process, this calls the action_cb.
     #
-    # Args:
-    #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
-    #
-    def child_action(self, pipe_w):
-        # Assign the pipe we passed across the process boundaries
-        #
+    def child_action(self):
         # Set the global message handler in this child
         # process to forward messages to the parent process
-        self._pipe_w = pipe_w
         self._messenger.setup_new_action_context(
             self.action_name, self._message_element_name, self._message_element_key
         )
@@ -572,8 +500,6 @@ class ChildJob:
             except TerminateException:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
-            finally:
-                self._pipe_w.close()
 
     # terminate()
     #


[buildstream] 02/03: job.py: Stop using the queue to send data between the child and parent

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a05e05aad44c409546e265280d1ad388cc5fdd85
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 11:48:35 2021 +0000

    job.py: Stop using the queue to send data between the child and parent
    
    This removes the need to have all messages processed in the master
    thread, and instead allows them to be done in any thread.
    
    * _messenger.py:
    
      - Store optional job information in the thread local
        storage and expand the message with it if it is present.
    
      - Make the message handler something global and remove the need to
        have a thread-specific one.
    
      - Have message filter out silenced and LOG messages from jobs
    
    * job.py: Remove the job-specific message handler
---
 src/buildstream/_messenger.py          | 56 ++++++++++++++++++++++++++--------
 src/buildstream/_scheduler/jobs/job.py | 45 ++++-----------------------
 2 files changed, 49 insertions(+), 52 deletions(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 01a8cfd..edb79ec 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,7 @@ from typing import Optional, Callable, Iterator, TextIO
 
 from . import _signals
 from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
 from ._state import State, Task
 
 
@@ -48,6 +48,13 @@ class _TimeData:
         self.start_time: datetime.datetime = start_time
 
 
+class _JobInfo:
+    def __init__(self, action_name: str, element_name: str, element_key: str) -> None:
+        self.action_name = action_name
+        self.element_name = element_name
+        self.element_key = element_key
+
+
 # _MessengerLocal
 #
 # Thread local storage for the messenger
@@ -56,13 +63,6 @@ class _MessengerLocal(threading.local):
     def __init__(self) -> None:
         super().__init__()
 
-        # The callback to call when propagating messages
-        #
-        # FIXME: The message handler is currently not strongly typed,
-        #        as it uses a kwarg, we cannot declare it with Callable.
-        #        We can use `Protocol` to strongly type this with python >= 3.8
-        self.message_handler = None
-
         # The open file handle for this task
         self.log_handle: Optional[TextIO] = None
 
@@ -72,6 +72,9 @@ class _MessengerLocal(threading.local):
         # Level of silent messages depth in this task
         self.silence_scope_depth: int = 0
 
+        # Job
+        self.job: Optional[_JobInfo] = None
+
 
 # Messenger()
 #
@@ -97,8 +100,16 @@ class Messenger:
         # Thread local storage
         self._locals: _MessengerLocal = _MessengerLocal()
 
-    def setup_new_action_context(self) -> None:
+        # The callback to call when propagating messages
+        #
+        # FIXME: The message handler is currently not strongly typed,
+        #        as it uses a kwarg, we cannot declare it with Callable.
+        #        We can use `Protocol` to strongly type this with python >= 3.8
+        self._message_handler = None
+
+    def setup_new_action_context(self, action_name: str, element_name: str, element_key: str) -> None:
         self._locals.silence_scope_depth = 0
+        self._locals.job = _JobInfo(action_name, element_name, element_key)
 
     # set_message_handler()
     #
@@ -106,7 +117,7 @@ class Messenger:
     # the messenger.
     #
     def set_message_handler(self, handler) -> None:
-        self._locals.message_handler = handler
+        self._message_handler = handler
 
     # set_state()
     #
@@ -140,12 +151,31 @@ class Messenger:
         # If we are recording messages, dump a copy into the open log file.
         self._record_message(message)
 
+        # Always add the log filename automatically
+        message.logfile = self._locals.log_filename
+
+        is_silenced = self._silent_messages()
+        job = self._locals.job
+
+        if job is not None:
+            # Automatically add message information from the job context
+            message.action_name = job.action_name
+            message.task_element_name = job.element_name
+            message.task_element_key = job.element_key
+
+            # Don't forward LOG messages from jobs
+            if message.message_type == MessageType.LOG:
+                return
+
+            # Don't forward JOB messages if they are currently silent
+            if is_silenced and (message.message_type not in unconditional_messages):
+                return
+
         # Send it off to the log handler (can be the frontend,
         # or it can be the child task which will propagate
         # to the frontend)
-        assert self._locals.message_handler
-
-        self._locals.message_handler(message, is_silenced=self._silent_messages())
+        assert self._message_handler
+        self._message_handler(message, is_silenced=is_silenced)
 
     # status():
     #
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 6cd11ee..e7ac23f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -32,7 +32,7 @@ import traceback
 from ... import utils
 from ..._utils import terminate_thread
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
 from ...types import FastEnum
 from ..._signals import TerminateException
 
@@ -360,13 +360,12 @@ class Job:
     def _parent_process_pipe(self):
         while self._pipe_r.poll():
             try:
-                message = self._pipe_r.recv()
+                self._pipe_r.recv()
+                assert False, "No message should be received anymore"
             except EOFError:
                 self._parent_stop_listening()
                 break
 
-            self._messenger.message(message)
-
     # _parent_recv()
     #
     # A callback to handle I/O events from the message
@@ -493,8 +492,9 @@ class ChildJob:
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._pipe_w = pipe_w
-        self._messenger.setup_new_action_context()
-        self._messenger.set_message_handler(self._child_message_handler)
+        self._messenger.setup_new_action_context(
+            self.action_name, self._message_element_name, self._message_element_key
+        )
 
         # Time, log and and run the action function
         #
@@ -593,36 +593,3 @@ class ChildJob:
                 return
 
         terminate_thread(self._thread_id)
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
-
-    # _child_message_handler()
-    #
-    # A Context delegate for handling messages, this replaces the
-    # frontend's main message handler in the context of a child task
-    # and performs local logging to the local log file before sending
-    # the message back to the parent process for further propagation.
-    # The related element display key is added to the message for
-    # widget rendering if not already set for an element childjob.
-    #
-    # Args:
-    #    message     (Message): The message to log
-    #    is_silenced (bool)   : Whether messages are silenced
-    #
-    def _child_message_handler(self, message, is_silenced):
-
-        message.action_name = self.action_name
-        message.task_element_name = self._message_element_name
-        message.task_element_key = self._message_element_key
-
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in unconditional_messages):
-            return
-
-        # Don't bother propagating these to the frontend
-        if message.message_type == MessageType.LOG:
-            return
-
-        self._pipe_w.send(message)


[buildstream] 01/03: _messenger.py: Add a hook to explicitely start a new action context

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 13ac06015f642cc61392c3e6f5f49e81a0a023cd
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 09:34:56 2021 +0000

    _messenger.py: Add a hook to explicitely start a new action context
    
    This hook overrides the locals from the messenger, and ensures that we
    do not leak information between actions.
    
    This is required, as we reuse the threads between actions.
    We used to re-initialize it in the `recorded_messages` method, but that
    is an odd side effect.
    
    * job.py: Make a new context upon starting the new job
---
 src/buildstream/_messenger.py          | 4 +++-
 src/buildstream/_scheduler/jobs/job.py | 1 +
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 3bd98cd..01a8cfd 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -97,6 +97,9 @@ class Messenger:
         # Thread local storage
         self._locals: _MessengerLocal = _MessengerLocal()
 
+    def setup_new_action_context(self) -> None:
+        self._locals.silence_scope_depth = 0
+
     # set_message_handler()
     #
     # Sets the handler for any status messages propagated through
@@ -362,7 +365,6 @@ class Messenger:
         # Create the fully qualified logfile in the log directory,
         # appending the pid and .log extension at the end.
         self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
-        self._locals.silence_scope_depth = 0
 
         # Ensure the directory exists first
         directory = os.path.dirname(self._locals.log_filename)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c875afe..6cd11ee 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -493,6 +493,7 @@ class ChildJob:
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._pipe_w = pipe_w
+        self._messenger.setup_new_action_context()
         self._messenger.set_message_handler(self._child_message_handler)
 
         # Time, log and and run the action function