You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/06/10 21:49:06 UTC
[airflow] branch main updated: Refactor: `SKIPPED` shouldn't be
logged again as `SUCCESS`. (#14822)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dbeec89 Refactor: `SKIPPED` shouldn't be logged again as `SUCCESS`. (#14822)
dbeec89 is described below
commit dbeec896fd752f266d1fd9950ba9220d415231b9
Author: suiting-young <80...@users.noreply.github.com>
AuthorDate: Fri Jun 11 05:48:51 2021 +0800
Refactor: `SKIPPED` shouldn't be logged again as `SUCCESS`. (#14822)
* `SKIPPED` shouldn't be logged again as `SUCCESS`.
* `_safe_date` duplicates with `_date_or_empty`.
* Borrowed advantage from `_safe_date`.
---
airflow/models/taskinstance.py | 65 +++++++++++++-----------------------------
1 file changed, 20 insertions(+), 45 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 4fd72e7..b4e644e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1087,12 +1087,23 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
self.log.info("Executing %s on %s", self.task, self.execution_date)
return True
- def _date_or_empty(self, attr):
- if hasattr(self, attr):
- date = getattr(self, attr)
- if date:
- return date.strftime('%Y%m%dT%H%M%S')
- return ''
+ def _date_or_empty(self, attr: str):
+ result = getattr(self, attr, None) # type: datetime
+ return result.strftime('%Y%m%dT%H%M%S') if result else ''
+
+ def _log_state(self, lead_msg: str = ''):
+ self.log.info(
+ '%sMarking task as %s.'
+ + ' dag_id=%s, task_id=%s,'
+ + ' execution_date=%s, start_date=%s, end_date=%s',
+ lead_msg,
+ self.state.upper(),
+ self.dag_id,
+ self.task_id,
+ self._date_or_empty('execution_date'),
+ self._date_or_empty('start_date'),
+ self._date_or_empty('end_date'),
+ )
@provide_session
@Sentry.enrich_errors
@@ -1147,15 +1158,6 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
self.log.info(e)
self.refresh_from_db(lock_for_update=True)
self.state = State.SKIPPED
- self.log.info(
- 'Marking task as SKIPPED. '
- 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
- self.dag_id,
- self.task_id,
- self._date_or_empty('execution_date'),
- self._date_or_empty('start_date'),
- self._date_or_empty('end_date'),
- )
except AirflowRescheduleException as reschedule_exception:
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode)
@@ -1181,17 +1183,9 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
finally:
Stats.incr(f'ti.finish.{task.dag_id}.{task.task_id}.{self.state}')
- # Recording SUCCESS
+ # Recording SKIPPED or SUCCESS
self.end_date = timezone.utcnow()
- self.log.info(
- 'Marking task as SUCCESS. '
- 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
- self.dag_id,
- self.task_id,
- self._date_or_empty('execution_date'),
- self._date_or_empty('start_date'),
- self._date_or_empty('end_date'),
- )
+ self._log_state()
self.set_duration()
if not test_mode:
session.add(Log(self.state, self))
@@ -1458,25 +1452,12 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
if force_fail or not self.is_eligible_to_retry():
self.state = State.FAILED
- if force_fail:
- log_message = "Immediate failure requested. Marking task as FAILED."
- else:
- log_message = "Marking task as FAILED."
email_for_state = task.email_on_failure
else:
self.state = State.UP_FOR_RETRY
- log_message = "Marking task as UP_FOR_RETRY."
email_for_state = task.email_on_retry
- self.log.info(
- '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
- log_message,
- self.dag_id,
- self.task_id,
- self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
- self._safe_date('start_date', '%Y%m%dT%H%M%S'),
- self._safe_date('end_date', '%Y%m%dT%H%M%S'),
- )
+ self._log_state('Immediate failure requested. ' if force_fail else '')
if email_for_state and task.email:
try:
self.email_alert(error)
@@ -1502,12 +1483,6 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
"""Is task instance is eligible for retry"""
return self.task.retries and self.try_number <= self.max_tries
- def _safe_date(self, date_attr, fmt):
- result = getattr(self, date_attr, None)
- if result is not None:
- return result.strftime(fmt)
- return ''
-
@provide_session
def get_template_context(self, session=None) -> Context: # pylint: disable=too-many-locals
"""Return TI Context"""