You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/06/10 21:49:06 UTC

[airflow] branch main updated: Refactor: `SKIPPED` shouldn't be logged again as `SUCCESS`. (#14822)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dbeec89  Refactor: `SKIPPED` shouldn't be logged again as `SUCCESS`. (#14822)
dbeec89 is described below

commit dbeec896fd752f266d1fd9950ba9220d415231b9
Author: suiting-young <80...@users.noreply.github.com>
AuthorDate: Fri Jun 11 05:48:51 2021 +0800

    Refactor: `SKIPPED` shouldn't be logged again as `SUCCESS`. (#14822)
    
    * `SKIPPED` shouldn't be logged again as `SUCCESS`.
    
    * `_safe_date` duplicates with `_date_or_empty`.
    
    * Borrowed advantage from `_safe_date`.
---
 airflow/models/taskinstance.py | 65 +++++++++++++-----------------------------
 1 file changed, 20 insertions(+), 45 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 4fd72e7..b4e644e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1087,12 +1087,23 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
                 self.log.info("Executing %s on %s", self.task, self.execution_date)
         return True
 
-    def _date_or_empty(self, attr):
-        if hasattr(self, attr):
-            date = getattr(self, attr)
-            if date:
-                return date.strftime('%Y%m%dT%H%M%S')
-        return ''
+    def _date_or_empty(self, attr: str):
+        result = getattr(self, attr, None)  # type: datetime
+        return result.strftime('%Y%m%dT%H%M%S') if result else ''
+
+    def _log_state(self, lead_msg: str = ''):
+        self.log.info(
+            '%sMarking task as %s.'
+            + ' dag_id=%s, task_id=%s,'
+            + ' execution_date=%s, start_date=%s, end_date=%s',
+            lead_msg,
+            self.state.upper(),
+            self.dag_id,
+            self.task_id,
+            self._date_or_empty('execution_date'),
+            self._date_or_empty('start_date'),
+            self._date_or_empty('end_date'),
+        )
 
     @provide_session
     @Sentry.enrich_errors
@@ -1147,15 +1158,6 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
                 self.log.info(e)
             self.refresh_from_db(lock_for_update=True)
             self.state = State.SKIPPED
-            self.log.info(
-                'Marking task as SKIPPED. '
-                'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
-                self.dag_id,
-                self.task_id,
-                self._date_or_empty('execution_date'),
-                self._date_or_empty('start_date'),
-                self._date_or_empty('end_date'),
-            )
         except AirflowRescheduleException as reschedule_exception:
             self.refresh_from_db()
             self._handle_reschedule(actual_start_date, reschedule_exception, test_mode)
@@ -1181,17 +1183,9 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
         finally:
             Stats.incr(f'ti.finish.{task.dag_id}.{task.task_id}.{self.state}')
 
-        # Recording SUCCESS
+        # Recording SKIPPED or SUCCESS
         self.end_date = timezone.utcnow()
-        self.log.info(
-            'Marking task as SUCCESS. '
-            'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
-            self.dag_id,
-            self.task_id,
-            self._date_or_empty('execution_date'),
-            self._date_or_empty('start_date'),
-            self._date_or_empty('end_date'),
-        )
+        self._log_state()
         self.set_duration()
         if not test_mode:
             session.add(Log(self.state, self))
@@ -1458,25 +1452,12 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
 
         if force_fail or not self.is_eligible_to_retry():
             self.state = State.FAILED
-            if force_fail:
-                log_message = "Immediate failure requested. Marking task as FAILED."
-            else:
-                log_message = "Marking task as FAILED."
             email_for_state = task.email_on_failure
         else:
             self.state = State.UP_FOR_RETRY
-            log_message = "Marking task as UP_FOR_RETRY."
             email_for_state = task.email_on_retry
 
-        self.log.info(
-            '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
-            log_message,
-            self.dag_id,
-            self.task_id,
-            self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
-            self._safe_date('start_date', '%Y%m%dT%H%M%S'),
-            self._safe_date('end_date', '%Y%m%dT%H%M%S'),
-        )
+        self._log_state('Immediate failure requested. ' if force_fail else '')
         if email_for_state and task.email:
             try:
                 self.email_alert(error)
@@ -1502,12 +1483,6 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
         """Is task instance is eligible for retry"""
         return self.task.retries and self.try_number <= self.max_tries
 
-    def _safe_date(self, date_attr, fmt):
-        result = getattr(self, date_attr, None)
-        if result is not None:
-            return result.strftime(fmt)
-        return ''
-
     @provide_session
     def get_template_context(self, session=None) -> Context:  # pylint: disable=too-many-locals
         """Return TI Context"""