You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by be...@apache.org on 2021/02/24 19:24:30 UTC
[buildstream] branch bschubert/remove-pipe-job updated (2149c97 ->
fa32cda)
This is an automated email from the ASF dual-hosted git repository.
benschubert pushed a change to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git.
discard 2149c97 job.py: Completely remove the pipe between child and parent process
discard 01b4777 job.py: Stop using the queue to send data between the child and parent
discard c653e31 _messenger.py: Add a hook to explicitely start a new action context
add 0470fdc .github/workflows/merge.yml: Publish docs in the docs/ subdirectory
add 6f603d7 .github/workflows/merge.yml: Fix tab errors in previous commit
add 20add16 .github/workflows/merge.yml: Fix publishing of docs tarball.
add f0a0ffd .asf.yaml: Configure pages to be published from /docs
add f1f4560 doc/badges.py: Adjustment of badge generation
add aeb7a8d doc/badges.py: Adding staticly generated HTML page for automated redirects.
add 8cad3f2 ci: Simplify pipelines using Docker Compose
add 58fc11c ci: Enable parallelism for tests
add a666b0f Merge pull request #1441 from apache/chandan/compose
add 8cbbeb1 .asf.yaml: Redirect notifications to the commit list
add ed48d1b .github/common.env: Defining TOXENV candidates here
add 517e7b3 .github/compose/ci.docker-compose.yml: Minor refactoring here
add e087c8a .github: Added remote execution CI
add 443d3ba Merge pull request #1444 from apache/tristan/remote-execution-testing
add b51aa86 .github: Refactor how we run the remote execution test
add 835c4b4 .github: Revived remote cache testing
add 7e52067 Merge pull request #1445 from apache/tristan/remote-cache-testing
add e8de0d5 _assetcache.py: Allow explicit re-initialization of remotes.
add 8171596 element.py: Added internal API _mimic_artifact()
add ab8f1dc _artifactelement.py: Override _pull_done()
add 432c195 _stream.py: Pre-emptive pulling of artifact metadata in some cases
add 43c8206 _stream.py: Added new _reset() function.
add 33a5107 tests/frontend/artifact_pull.py: Test pulling artifacts with various deps options
add 01af64a tests/frontend/artifact_checkout.py: Test checking out remote artifacts
add 89d3af2 Merge pull request #1433 from apache/tristan/fix-recursive-artifact-pull
add 19cd578 Allow certain operations to work without loading a project
add 1c648cf tests/frontend/artifact_list_contents.py: Use parametrized tests
add 4eb2c07 tests/frontend/artifact_list_contents.py: Test listing artifact content without a project
add 953c0ee tests/frontend/artifact_checkout.py: Test artifact checkout without project
add 15f0301 tests/frontend/artifact_delete.py: Test artifact deletion without a project
add 1e4a48a tests/frontend/artifact_log.py: Test artifact log without a project
add 724337b tests/frontend/artifact_pull.py: Test artifact pull without a project
add cc66f9e tests/frontend/artifact_show.py: Test artifact show without a project
add 7dd7690 Merge pull request #1442 from apache/tristan/optional-project
add 0b1f29f .github: Add fedora 33 to CI suite
add 04ed272 .github: Stop testing against Fedora 31
add e02af38 Merge pull request #1446 from apache/chandan/fedora-33
add aa6feac node.pyx: Fixed error reporting in SequenceNode.as_str_list()
add b6ad110 node.pyx: Fix segfault when calling MappingNode.get_str_list() with default None
add 88625fb node.pyi: Adding some missing type annotations
add 36594b6 node.pyx, node.pxd, node.pyi: Type checking in MappingNode.get_sequence()
add 77b0114 tests/internals/yaml.py: Test error reporting of SequenceNode.as_str_list()
add fbda86d tests/internals/yaml.py: Test MappingNode.get_str_list() with default None
add fcb7c8b Merge pull request #1452 from apache/tristan/yaml-fixes
add 6957eeb Temporarily disabling notifications
add 363ae31 Revert "Temporarily disabling notifications"
add 93a7a3a buildelement: switch order of timed_activity and sandbox.batch
add 3c2cace Merge pull request #1437 from abderrahim/context-manager-order
add dd80e0f .asf.yaml: Publish pages from root of the gh-pages branch
add 3827f3b .github/workflows/merge.yml: Refactor docs generation
add 2d5d486 .github/workflows/merge.yml: Fix docs publishing
add f26dcd8 .asf.yaml: Updating github metadata.
add c04352b .asf.yaml: Explicitly setting pages to publish from the "/"
add f7f9c97 element.py: Remove local __remote_execution_specs instance variable
add eda23aa _remotespec.py: Moving RemoteSpec and RemoteExecutionSpec to its own file
add 497e1ca Remote execution can only be configured in user configuration
add 8cfa27f plugins/elements/junction.py: Remove ambiguous cache related junction parameters
add a61f9f1 _remotespec.py: Modified format to put the certs in an "auth" dictionary.
add b8d1de7 Refactor remote asset user configuration
add 300e813 node.pyi: Fleshing out more type information about nodes.
add fd51cf2 _context.py: Changed artifact and source cache configuration again.
add e89a17a _frontend/widget.py: Print remote execution setup and project cache servers
add d1d9f33 NEWS: Document breaking change, removed support for deprecated RE config
add 10ad071 NEWS: Documenting breaking change, redesign of remote cache configuration
add 7a35498 doc: Redocumenting artifact/source cache servers.
add c143131 Merge pull request #1434 from apache/tristan/change-remote-config
add 5034c9c .github/compose/ci.remote-execution.yml: Fix tests
add a7fd921 Merge pull request #1444 from apache/tristan/fix-remote-execution-tests
add 77f2f48 _frontend/cli.py: Removing support for deprecated commands.
add b05dd1f _remotespec.py: Adding RemoteSpec.new_from_string()
add 4b90394 Moving HostMount from _project.py -> types.py
add 97deb41 Moving some cython functions into _utils.pyx
add d2c9c89 Support specifying remotes on the command line.
add f0e2e79 doc: Documenting how to specify remotes on the command line.
add a292743 NEWS: Updating with some new breaking changes
add d4cb3be Merge pull request #1438 from apache/tristan/remote-cli-options
add 7fcb131 _context.py: Remote `prompt` as a valid configuration key
add b646a8f _frontend/widget.py: Format full element names in `bst show --format %{deps}`
add 651bb00 doc/source/using_config.rst: Completely and thoroughly document user config.
add aabb43e src/buildstream/data/userconfig.yaml: Remove some comments.
add 822b83f Merge pull request #1442 from apache/tristan/user-config-docs
add e66910d _frontend/cli.py: Add `--except` option to `bst source push`
add 9de6fb3 Merge pull request #1445 from apache/tristan/add-except-option
add 4f617ed doc/source/main_install.rst: Update BuildBox components to 0.0.38
add 5ebe6ab Merge pull request #1443 from apache/juerg/buildbox
new 13ac060 _messenger.py: Add a hook to explicitely start a new action context
new a05e05a job.py: Stop using the queue to send data between the child and parent
new fa32cda job.py: Completely remove the pipe between child and parent process
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (2149c97)
\
N -- N -- N refs/heads/bschubert/remove-pipe-job (fa32cda)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.asf.yaml | 12 +
.github/common.env | 6 +
.github/compose/buildbarn-config/asset.jsonnet | 32 +
.github/compose/buildbarn-config/storage.jsonnet | 26 +
.github/compose/ci.buildbarn-remote-cache.yml | 55 +
.github/compose/ci.buildstream-remote-cache.yml | 36 +
.github/compose/ci.docker-compose.yml | 112 ++
.github/compose/ci.remote-execution.yml | 67 ++
.github/workflows/ci.yml | 189 ++--
.github/workflows/merge.yml | 49 +-
.github/workflows/release.yml | 26 +-
.pylintrc | 2 -
NEWS | 18 +
README.rst | 4 +-
doc/Makefile | 4 +-
doc/badges.py | 45 +-
doc/source/format_project.rst | 162 +--
doc/source/main_install.rst | 12 +-
doc/source/using_commands.rst | 71 ++
doc/source/using_config.rst | 1128 +++++++++++++++++---
doc/source/using_configuring_cache_server.rst | 57 +-
setup.py | 2 -
src/buildstream/_artifactcache.py | 49 +-
src/buildstream/_artifactelement.py | 11 +
src/buildstream/_assetcache.py | 423 +++-----
src/buildstream/_cas/casremote.py | 12 +-
src/buildstream/_context.py | 429 ++++++--
src/buildstream/_elementsourcescache.py | 13 +-
src/buildstream/_frontend/app.py | 25 +-
src/buildstream/_frontend/cli.py | 420 +++++---
src/buildstream/_frontend/widget.py | 93 +-
src/buildstream/_loader/_loader.pyi | 1 -
src/buildstream/_loader/_loader.pyx | 52 -
src/buildstream/_loader/loader.py | 4 +-
src/buildstream/_platform/platform.py | 2 +-
src/buildstream/_project.py | 91 +-
src/buildstream/_remote.py | 205 +---
src/buildstream/_remotespec.py | 493 +++++++++
src/buildstream/_scheduler/jobs/_job.pyi | 1 -
src/buildstream/_scheduler/jobs/_job.pyx | 15 -
src/buildstream/_scheduler/jobs/job.py | 2 +-
src/buildstream/_sourcecache.py | 15 +-
src/buildstream/_stream.py | 608 +++++++----
src/buildstream/_utils.pyi | 2 +
src/buildstream/_utils.pyx | 50 +
src/buildstream/_workspaces.py | 10 +-
src/buildstream/buildelement.py | 2 +-
src/buildstream/data/userconfig.yaml | 55 +-
src/buildstream/element.py | 58 +-
src/buildstream/node.pxd | 2 +-
src/buildstream/node.pyi | 84 +-
src/buildstream/node.pyx | 82 +-
src/buildstream/plugins/elements/junction.py | 13 +-
src/buildstream/plugins/elements/stack.py | 2 +-
src/buildstream/sandbox/_sandboxremote.py | 180 +---
src/buildstream/testing/runcli.py | 4 +-
src/buildstream/types.py | 22 +-
tests/artifactcache/capabilities.py | 6 +-
tests/artifactcache/config.py | 130 +--
tests/artifactcache/junctions.py | 159 +--
tests/artifactcache/pull.py | 15 +-
tests/artifactcache/push.py | 24 +-
tests/format/project.py | 7 +-
tests/frontend/artifact_checkout.py | 84 ++
tests/frontend/artifact_delete.py | 11 +-
tests/frontend/artifact_list_contents.py | 107 +-
tests/frontend/artifact_log.py | 34 +-
tests/frontend/artifact_pull.py | 83 ++
tests/frontend/artifact_show.py | 9 +-
tests/frontend/buildcheckout.py | 6 +-
tests/frontend/completions.py | 13 +-
tests/frontend/default_target.py | 2 +-
tests/frontend/help.py | 4 +-
tests/frontend/large_directory.py | 2 +-
.../foo.bst => project/elements/target-import.bst} | 4 +
tests/frontend/pull.py | 68 +-
tests/frontend/push.py | 99 +-
tests/frontend/remote-caches.py | 8 +-
tests/frontend/workspace.py | 2 +-
tests/integration/artifact.py | 6 +-
tests/integration/cachedfail.py | 8 +-
tests/integration/pullbuildtrees.py | 16 +-
tests/integration/shell.py | 2 +-
tests/integration/shellbuildtrees.py | 12 +-
tests/internals/yaml.py | 31 +
tests/internals/yaml/list-of-dict.yaml | 5 +
tests/internals/yaml/list-of-list.yaml | 3 +
tests/remotecache/simple.py | 2 +-
tests/remoteexecution/buildtree.py | 4 +-
tests/remoteexecution/partial.py | 5 +-
tests/sandboxes/remote-exec-config.py | 63 +-
.../missing-certs}/project.conf | 0
tests/sourcecache/capabilities.py | 12 +-
tests/sourcecache/fetch.py | 6 +-
tests/sourcecache/push.py | 20 +-
tests/sourcecache/workspace.py | 6 +-
96 files changed, 4230 insertions(+), 2493 deletions(-)
create mode 100644 .github/common.env
create mode 100644 .github/compose/buildbarn-config/asset.jsonnet
create mode 100644 .github/compose/buildbarn-config/storage.jsonnet
create mode 100644 .github/compose/ci.buildbarn-remote-cache.yml
create mode 100644 .github/compose/ci.buildstream-remote-cache.yml
create mode 100644 .github/compose/ci.docker-compose.yml
create mode 100644 .github/compose/ci.remote-execution.yml
delete mode 100644 src/buildstream/_loader/_loader.pyi
delete mode 100644 src/buildstream/_loader/_loader.pyx
create mode 100644 src/buildstream/_remotespec.py
delete mode 100644 src/buildstream/_scheduler/jobs/_job.pyi
delete mode 100644 src/buildstream/_scheduler/jobs/_job.pyx
create mode 100644 tests/frontend/artifact_checkout.py
create mode 100644 tests/frontend/artifact_pull.py
copy tests/frontend/{logging/elements/foo.bst => project/elements/target-import.bst} (54%)
create mode 100644 tests/internals/yaml/list-of-dict.yaml
create mode 100644 tests/internals/yaml/list-of-list.yaml
copy tests/sandboxes/{missing-command => remote-exec-config/missing-certs}/project.conf (100%)
[buildstream] 03/03: job.py: Completely remove the pipe between
child and parent process
Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit fa32cda5aaf57504722f07ac47ba1f6bfa0f1800
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 12:00:07 2021 +0000
job.py: Completely remove the pipe between child and parent process
This pipe is not needed at all anymore
---
src/buildstream/_scheduler/jobs/job.py | 80 ++--------------------------------
1 file changed, 3 insertions(+), 77 deletions(-)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index e7ac23f..08f1a66 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@
import asyncio
import datetime
import itertools
-import multiprocessing
import threading
import traceback
@@ -113,8 +112,6 @@ class Job:
#
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
- self._pipe_r = None # The read end of a pipe for message passing
- self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
@@ -143,11 +140,7 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
- # FIXME: remove this, this is not necessary when using asyncio
- self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
self._tries += 1
- self._parent_start_listening()
# FIXME: remove the parent/child separation, it's not needed anymore.
self._child = self.create_child_job( # pylint: disable=assignment-from-no-return
@@ -164,7 +157,7 @@ class Job:
loop = asyncio.get_event_loop()
async def execute():
- ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+ ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
await self._parent_child_completed(ret_code)
self._task = loop.create_task(execute())
@@ -178,9 +171,6 @@ class Job:
def terminate(self):
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
- # Make sure there is no garbage on the pipe
- self._parent_stop_listening()
-
if self._task:
self._child.terminate()
@@ -289,16 +279,6 @@ class Job:
# Local Private Methods #
#######################################################
- # _parent_shutdown()
- #
- # Shuts down the Job on the parent side by reading any remaining
- # messages on the message pipe and cleaning up any resources.
- #
- def _parent_shutdown(self):
- # Make sure we've read everything we need and then stop listening
- self._parent_process_pipe()
- self._parent_stop_listening()
-
# _parent_child_completed()
#
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -307,8 +287,6 @@ class Job:
# returncode (int): The return code of the child process
#
async def _parent_child_completed(self, returncode):
- self._parent_shutdown()
-
try:
returncode = _ReturnCode(returncode)
except ValueError:
@@ -347,50 +325,7 @@ class Job:
self.parent_complete(status, self._result)
self._scheduler.job_completed(self, status)
-
- # Force the deletion of the pipe and process objects to try and clean up FDs
- self._pipe_r.close()
- self._pipe_r = self._task = None
-
- # _parent_process_pipe()
- #
- # Reads back message envelopes from the message pipe
- # in the parent process.
- #
- def _parent_process_pipe(self):
- while self._pipe_r.poll():
- try:
- self._pipe_r.recv()
- assert False, "No message should be received anymore"
- except EOFError:
- self._parent_stop_listening()
- break
-
- # _parent_recv()
- #
- # A callback to handle I/O events from the message
- # pipe file descriptor in the main process message loop
- #
- def _parent_recv(self, *args):
- self._parent_process_pipe()
-
- # _parent_start_listening()
- #
- # Starts listening on the message pipe
- #
- def _parent_start_listening(self):
- if not self._listening:
- self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
- self._listening = True
-
- # _parent_stop_listening()
- #
- # Stops listening on the message pipe
- #
- def _parent_stop_listening(self):
- if self._listening:
- self._scheduler.loop.remove_reader(self._pipe_r.fileno())
- self._listening = False
+ self._task = None
# ChildJob()
@@ -431,7 +366,6 @@ class ChildJob:
self._message_element_name = message_element_name
self._message_element_key = message_element_key
- self._pipe_w = None # The write end of a pipe for message passing
self._thread_id = None # Thread in which the child executes its action
self._should_terminate = False
self._terminate_lock = threading.Lock()
@@ -483,15 +417,9 @@ class ChildJob:
#
# Perform the action in the child process, this calls the action_cb.
#
- # Args:
- # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
- #
- def child_action(self, pipe_w):
- # Assign the pipe we passed across the process boundaries
- #
+ def child_action(self):
# Set the global message handler in this child
# process to forward messages to the parent process
- self._pipe_w = pipe_w
self._messenger.setup_new_action_context(
self.action_name, self._message_element_name, self._message_element_key
)
@@ -572,8 +500,6 @@ class ChildJob:
except TerminateException:
self._thread_id = None
return _ReturnCode.TERMINATED, None
- finally:
- self._pipe_w.close()
# terminate()
#
[buildstream] 02/03: job.py: Stop using the queue to send data
between the child and parent
Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit a05e05aad44c409546e265280d1ad388cc5fdd85
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 11:48:35 2021 +0000
job.py: Stop using the queue to send data between the child and parent
This removes the need to have all messages processed in the master
thread, and instead allows them to be done in any thread.
* _messenger.py:
- Store optional job information in the thread local
storage and expand the message with it if it is present.
- Make the message handler something global and remove the need to
have a thread-specific one.
- Have message filter out silenced and LOG messages from jobs
* job.py: Remove the job-specific message handler
---
src/buildstream/_messenger.py | 56 ++++++++++++++++++++++++++--------
src/buildstream/_scheduler/jobs/job.py | 45 ++++-----------------------
2 files changed, 49 insertions(+), 52 deletions(-)
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 01a8cfd..edb79ec 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,7 @@ from typing import Optional, Callable, Iterator, TextIO
from . import _signals
from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
from ._state import State, Task
@@ -48,6 +48,13 @@ class _TimeData:
self.start_time: datetime.datetime = start_time
+class _JobInfo:
+ def __init__(self, action_name: str, element_name: str, element_key: str) -> None:
+ self.action_name = action_name
+ self.element_name = element_name
+ self.element_key = element_key
+
+
# _MessengerLocal
#
# Thread local storage for the messenger
@@ -56,13 +63,6 @@ class _MessengerLocal(threading.local):
def __init__(self) -> None:
super().__init__()
- # The callback to call when propagating messages
- #
- # FIXME: The message handler is currently not strongly typed,
- # as it uses a kwarg, we cannot declare it with Callable.
- # We can use `Protocol` to strongly type this with python >= 3.8
- self.message_handler = None
-
# The open file handle for this task
self.log_handle: Optional[TextIO] = None
@@ -72,6 +72,9 @@ class _MessengerLocal(threading.local):
# Level of silent messages depth in this task
self.silence_scope_depth: int = 0
+ # Job
+ self.job: Optional[_JobInfo] = None
+
# Messenger()
#
@@ -97,8 +100,16 @@ class Messenger:
# Thread local storage
self._locals: _MessengerLocal = _MessengerLocal()
- def setup_new_action_context(self) -> None:
+ # The callback to call when propagating messages
+ #
+ # FIXME: The message handler is currently not strongly typed,
+ # as it uses a kwarg, we cannot declare it with Callable.
+ # We can use `Protocol` to strongly type this with python >= 3.8
+ self._message_handler = None
+
+ def setup_new_action_context(self, action_name: str, element_name: str, element_key: str) -> None:
self._locals.silence_scope_depth = 0
+ self._locals.job = _JobInfo(action_name, element_name, element_key)
# set_message_handler()
#
@@ -106,7 +117,7 @@ class Messenger:
# the messenger.
#
def set_message_handler(self, handler) -> None:
- self._locals.message_handler = handler
+ self._message_handler = handler
# set_state()
#
@@ -140,12 +151,31 @@ class Messenger:
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
+ # Always add the log filename automatically
+ message.logfile = self._locals.log_filename
+
+ is_silenced = self._silent_messages()
+ job = self._locals.job
+
+ if job is not None:
+ # Automatically add message information from the job context
+ message.action_name = job.action_name
+ message.task_element_name = job.element_name
+ message.task_element_key = job.element_key
+
+ # Don't forward LOG messages from jobs
+ if message.message_type == MessageType.LOG:
+ return
+
+ # Don't forward JOB messages if they are currently silent
+ if is_silenced and (message.message_type not in unconditional_messages):
+ return
+
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will propagate
# to the frontend)
- assert self._locals.message_handler
-
- self._locals.message_handler(message, is_silenced=self._silent_messages())
+ assert self._message_handler
+ self._message_handler(message, is_silenced=is_silenced)
# status():
#
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 6cd11ee..e7ac23f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -32,7 +32,7 @@ import traceback
from ... import utils
from ..._utils import terminate_thread
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
from ...types import FastEnum
from ..._signals import TerminateException
@@ -360,13 +360,12 @@ class Job:
def _parent_process_pipe(self):
while self._pipe_r.poll():
try:
- message = self._pipe_r.recv()
+ self._pipe_r.recv()
+ assert False, "No message should be received anymore"
except EOFError:
self._parent_stop_listening()
break
- self._messenger.message(message)
-
# _parent_recv()
#
# A callback to handle I/O events from the message
@@ -493,8 +492,9 @@ class ChildJob:
# Set the global message handler in this child
# process to forward messages to the parent process
self._pipe_w = pipe_w
- self._messenger.setup_new_action_context()
- self._messenger.set_message_handler(self._child_message_handler)
+ self._messenger.setup_new_action_context(
+ self.action_name, self._message_element_name, self._message_element_key
+ )
# Time, log and and run the action function
#
@@ -593,36 +593,3 @@ class ChildJob:
return
terminate_thread(self._thread_id)
-
- #######################################################
- # Local Private Methods #
- #######################################################
-
- # _child_message_handler()
- #
- # A Context delegate for handling messages, this replaces the
- # frontend's main message handler in the context of a child task
- # and performs local logging to the local log file before sending
- # the message back to the parent process for further propagation.
- # The related element display key is added to the message for
- # widget rendering if not already set for an element childjob.
- #
- # Args:
- # message (Message): The message to log
- # is_silenced (bool) : Whether messages are silenced
- #
- def _child_message_handler(self, message, is_silenced):
-
- message.action_name = self.action_name
- message.task_element_name = self._message_element_name
- message.task_element_key = self._message_element_key
-
- # Send to frontend if appropriate
- if is_silenced and (message.message_type not in unconditional_messages):
- return
-
- # Don't bother propagating these to the frontend
- if message.message_type == MessageType.LOG:
- return
-
- self._pipe_w.send(message)
[buildstream] 01/03: _messenger.py: Add a hook to explicitely start
a new action context
Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 13ac06015f642cc61392c3e6f5f49e81a0a023cd
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 09:34:56 2021 +0000
_messenger.py: Add a hook to explicitely start a new action context
This hook overrides the locals from the messenger, and ensures that we
do not leak information between actions.
This is required, as we reuse the threads between actions.
We used to re-initialize it in the `recorded_messages` method, but that
is an odd side effect.
* job.py: Make a new context upon starting the new job
---
src/buildstream/_messenger.py | 4 +++-
src/buildstream/_scheduler/jobs/job.py | 1 +
2 files changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 3bd98cd..01a8cfd 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -97,6 +97,9 @@ class Messenger:
# Thread local storage
self._locals: _MessengerLocal = _MessengerLocal()
+ def setup_new_action_context(self) -> None:
+ self._locals.silence_scope_depth = 0
+
# set_message_handler()
#
# Sets the handler for any status messages propagated through
@@ -362,7 +365,6 @@ class Messenger:
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
- self._locals.silence_scope_depth = 0
# Ensure the directory exists first
directory = os.path.dirname(self._locals.log_filename)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c875afe..6cd11ee 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -493,6 +493,7 @@ class ChildJob:
# Set the global message handler in this child
# process to forward messages to the parent process
self._pipe_w = pipe_w
+ self._messenger.setup_new_action_context()
self._messenger.set_message_handler(self._child_message_handler)
# Time, log and and run the action function