You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:15:39 UTC
[buildstream] 04/13: _messenger.py: Make `timed_suspendable` public
and use it in job.py
This is an automated email from the ASF dual-hosted git repository.
tvb pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 349f97d53817a992a5bc744b29a9012f9f2b7b77
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Sat Jul 4 12:13:24 2020 +0000
_messenger.py: Make `timed_suspendable` public and use it in job.py
This reduces the amount of code duplication
---
src/buildstream/_messenger.py | 60 +++++++++++++++++-----------------
src/buildstream/_scheduler/jobs/job.py | 23 +++----------
2 files changed, 35 insertions(+), 48 deletions(-)
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 222b05d..84bea6a 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -161,7 +161,7 @@ class Messenger:
#
@contextmanager
def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False):
- with self._timed_suspendable() as timedata:
+ with self.timed_suspendable() as timedata:
try:
# Push activity depth for status messages
message = Message(MessageType.START, activity_name, detail=detail, element_name=element_name)
@@ -205,7 +205,7 @@ class Messenger:
if not full_name:
full_name = activity_name
- with self._timed_suspendable() as timedata:
+ with self.timed_suspendable() as timedata:
try:
message = Message(MessageType.START, activity_name, element_name=element_name)
self.message(message)
@@ -326,6 +326,34 @@ class Messenger:
def get_log_filename(self):
return self._locals.log_filename
+ # timed_suspendable()
+ #
+ # A contextmanager that allows an activity to be suspended and can
+ # adjust for clock drift caused by suspending
+ #
+ # Yields:
+ # TimeData: An object that contains the time the activity started
+ #
+ @contextmanager
+ def timed_suspendable(self):
+ # Note: timedata needs to be in a namedtuple so that values can be
+ # yielded that will change
+ timedata = _TimeData(start_time=datetime.datetime.now())
+ stopped_time = None
+
+ def stop_time():
+ nonlocal stopped_time
+ stopped_time = datetime.datetime.now()
+
+ def resume_time():
+ nonlocal timedata
+ nonlocal stopped_time
+ sleep_time = datetime.datetime.now() - stopped_time
+ timedata.start_time += sleep_time
+
+ with _signals.suspendable(stop_time, resume_time):
+ yield timedata
+
# _record_message()
#
# Records the message if recording is enabled
@@ -388,31 +416,3 @@ class Messenger:
if self._render_status_cb and now >= self._next_render:
self._render_status_cb()
self._next_render = now + _RENDER_INTERVAL
-
- # _timed_suspendable()
- #
- # A contextmanager that allows an activity to be suspended and can
- # adjust for clock drift caused by suspending
- #
- # Yields:
- # TimeData: An object that contains the time the activity started
- #
- @contextmanager
- def _timed_suspendable(self):
- # Note: timedata needs to be in a namedtuple so that values can be
- # yielded that will change
- timedata = _TimeData(start_time=datetime.datetime.now())
- stopped_time = None
-
- def stop_time():
- nonlocal stopped_time
- stopped_time = datetime.datetime.now()
-
- def resume_time():
- nonlocal timedata
- nonlocal stopped_time
- sleep_time = datetime.datetime.now() - stopped_time
- timedata.start_time += sleep_time
-
- with _signals.suspendable(stop_time, resume_time):
- yield timedata
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index a4ace41..03a6b61 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -30,7 +30,6 @@ import traceback
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
-from ... import _signals
# Return code values shutdown of job handling child processes
@@ -603,21 +602,9 @@ class ChildJob:
self._pipe_w = pipe_w
self._messenger.set_message_handler(self._child_message_handler)
- starttime = datetime.datetime.now()
- stopped_time = None
-
- def stop_time():
- nonlocal stopped_time
- stopped_time = datetime.datetime.now()
-
- def resume_time():
- nonlocal stopped_time
- nonlocal starttime
- starttime += datetime.datetime.now() - stopped_time
-
# Time, log and and run the action function
#
- with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+ with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
self._logfile, self._logdir
) as filename:
self.message(MessageType.START, self.action_name, logfile=filename)
@@ -626,13 +613,13 @@ class ChildJob:
# Try the task action
result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
- elapsed = datetime.datetime.now() - starttime
+ elapsed = datetime.datetime.now() - timeinfo.start_time
self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
return _ReturnCode.SKIPPED
except BstError as e:
- elapsed = datetime.datetime.now() - starttime
+ elapsed = datetime.datetime.now() - timeinfo.start_time
retry_flag = e.temporary
if retry_flag and (self._tries <= self._max_retries):
@@ -662,7 +649,7 @@ class ChildJob:
# send the traceback and formatted exception back to the frontend
# and print it to the log file.
#
- elapsed = datetime.datetime.now() - starttime
+ elapsed = datetime.datetime.now() - timeinfo.start_time
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
@@ -674,7 +661,7 @@ class ChildJob:
self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
self._child_send_result(result)
- elapsed = datetime.datetime.now() - starttime
+ elapsed = datetime.datetime.now() - timeinfo.start_time
self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
# Shutdown needs to stay outside of the above context manager,